Ayende @ Rahien

My name is Ayende Rahien
Founder of Hibernating Rhinos LTD and RavenDB.
You can reach me by phone or email:


+972 52-548-6969

, @ Q c

Posts: 5,949 | Comments: 44,548

filter by tags archive

Buffer Managers, production code and alternative implementations

We are porting RavenDB to Linux, and as such, we run into a lot of… interesting issues. Today we run into a really annoying one.

We make use of the BufferManager class inside RavenDB to reduce memory allocations. On the .Net side of things, everything works just fine, and we never really had any issues with it.

On the Mono side of things, we started getting all sort of weird errors. From ArgumentOutOfRangeException to NullReferenceException to just plain weird stuff. That was the time to dig in and look into what is going on.

On the .NET side of things, BufferManager implementation is based on a selection criteria between large (more than 85Kb) and small buffers. For large buffers, there is a single large pool that is shared among all the users of the pool. For small buffers, the BufferManager uses a pool per active thread as well as a global pool, etc. In fact, looking at the code we see that it is really nice, and a lot of effort has been made to harden it and make it work nicely for many scenarios.

The Mono implementation, on the other hand, decides to blithely discard the API contract by ignoring the maximum buffer pool size. It seems because “no user code is designed to cope with this”. Considering the fact that RavenDB is certainly dealing with that, I’m somewhat insulted, but it seems par the course for Linux, where “memory is infinite until we kill you”* is the way to go.

But what is far worse is that this class is absolutely not thread safe. That was a lot of fun to discover. Considering that this piece of code is pretty central for the entire WCF stack, I’m not really sure how that worked. We ended up writing our own BufferManager impl for Mono, to avoid those issues.

* Yes, somewhat bitter here, I’ll admit. The next post will discuss this in detail.

.NET Packaging mess

In the past few years, we had:

  • .NET Full
  • .NET Micro
  • .NET Client Profile
  • .NET Silverlight
  • .NET Portable Class Library
  • .NET WinRT
  • Core CLR
  • Core CLR (Cloud Optimized)*
  • MessingWithYa CLR

* Can’t care enough to figure out if this is the same as the previous one or not.

In each of those cases, they offered similar, but not identical API and options. That is completely ignoring the versioning side of things ,where we have .NET 2.0 (1.0 finally died a while ago), .NET 3.5, .NET 4.0 and .NET 4.5. I don’t think that something can be done about versioning, but the packaging issue is painful.

Here is a small example why:


In each case, we need to subtly tweak the system to accommodate the new packaging option. This is pure additional cost to the system, with zero net benefit. Each time that we have to do that, we add a whole new dimension to the testing and support matrix, leaving aside the fact that the complexity of the solution is increasing.

I wouldn’t mind it so much, if it weren’t for the fact that a lot of those are effectively drive-bys, it feels. Silverlight took a lot of effort, and it is dead. WinRT took a lot of effort, and it is effectively dead.

This adds a real cost in time and effort, and it is hurting the platform as a whole.

Now users are running into issues with the Core CLR not supporting stuff that we use. So we need to rip out MEF from some of our code, and implement it ourselves just to get things in the same place as before.

Gossip much? Use cases and bad practices for gossip protocols

My previous few posts has talked about specific algorithms for gossip protocols, specifically: HyParView and Plumtrees. They dealt with the technical behavior of the system, the process in which we are sending data over the cluster to all the nodes. In this post, I want to talk a bit about what kind of messages we are going to send in such a system.

The obvious one is to try to keep the entire state of the system up to date using gossip. So whenever we make a change, we gossip about it to the entire network, and we are able to get to an eventually consistent system in which all nodes have roughly the same data. There is one problem with that, you now have a lot of nodes with the same data on them. At some point, that stop making sense. Usually gossip is used when you have a large group of servers, and just keep all the data on all the nodes is not a good idea unless your data set is very small.

So you don’t do that.  Gossip is usually used to disseminate a small data set, one that can fit comfortably inside a single machine (usually it is a very small data set, a few hundred MB at most). Let us consider a few types of messages that would fit in a gossip setting.

The obvious example is the actual topology of the whole network. A node joining up the cluster will announce its presence, and that will percolate to the entire cluster, eventually. That can allow you to have an idea (note, this isn’t a certainty) about what is the structure of the cluster, and maybe make decisions based on it.

The system wide configuration data is also a good candidate for gossip, for example, you can use gossip as a distributed service locator in the cluster. Whenever a new SMTP server comes online, it announces itself via gossip to the cluster. It is added to the list of SMTP servers in all the nodes that heard about it, and then it get used. In this kind of system, you have to take into account that servers can be down for a long period of time, and miss up on messages. Gossip does not guarantee that the messages will arrive, after all. Oh, it is going to do its best, but you need to also build an anti entropy system. If a server finds that it missed up on too much, it can request one of its peers to send it a full snapshot of the current global state as that peer know it.

Going in the same vein, nodes can gossip about the health state of the network. If I’m trying to send an email via an SMTP server, and it is down, I’m going to try another server, and let the network know that I’ve failed to talk to that particular server. If enough nodes fail to communicate with the server, that become part of the state of the system, so nodes that learned about it can avoid that server for a period of time.

Moving into a different direction, you can also do gossip queries, that can be done by sending a gossip message on the cluster with a specific query to it. A typical example might be “which node has a free 10GB that I can use?”. Such queries typically carry with them a timeout element. You send the query, and any matches are sent back to (either directly or also via gossip). After a predefined timeout, you can assume that you got all the replies that you are going to get, so you can operate on that. More interesting is when you want to query for the actual data held in each node. If we want to find all the users who logged in today, for example.

The problem with doing something like that is that you might have a large result set, and you’ll need to have some way to work with that. You don’t want to send it all to a single destination, and what would you do with it, anyway? For that reason, most of the time gossip queries are actually aggregation. We can use that to get an estimate of certain things in our cluster. If we wanted to get the number of users per country, that would be a good candidate for this, for example. Note that you won’t necessarily get accurate results, if you have failures, so there are aggregation methods for getting a good probability of the actual value.

For fun, here is an interesting exercise. Look at trending topics in a large number of conversations. In this case, whenever you would get a new message, you would analyze the topics for this message, and periodically (every second, let us say), you’ll gossip to your peers about this. In this case, we don’t just blindly pass the gossip between nodes. Instead, we’ll use a slightly different method. Each second, every node will contact its peers to send them the current trending topics in the node. Each time the trending topics change, a version number is incremented. In addition, the node also send its peer the node ids and versions of the messages it got from other nodes. The peer, in reply, will send a confirmation about all the node ids and versions that it has. So the origin node can fill in about any new information that it go, or ask to get updates for information that it doesn’t have.

This reduce the number of updates that flow throughout the cluster, while still maintain an eventually consistent model. We’ll be able to tell, from each node, what are the current trending topics globally.

Gossip much? Operating with partial information, and getting the right result.

Unlike the previous two posts, this is going to be short. Primarily because what I wanted to talk about it what impresses me most with both HyParView and Plumtree. The really nice thing about them is that they are pretty simple, easy to understand and produce good results.

But the fun part, and what make it impressive is that they manage to achieve that with a small set of simple rules, and without any attempt to create a global view. They operate just fine with potentially very small set of the data overall, but still manage to operate, self optimize and get to the correct result. In fact, I did some very minor attempts to play with this at large scale, and we see a pretty amazing emergent behavior. Without anyone knowing what is going on globally, we are able to get to the optimal number of interactions in the cluster to distribute information.

That is really pretty cool.

And because this post is too short, I’ll leave you with a question. Given that you have this kind of infrastructure, what would you do with it? What sort of information or operations would you try to send using this way?

Gossip much? The gossip epidemic and other issues in polite society

In my previous post, I talked about the Hybrid Partial View protocol, and showed a visualization about how it actually works. Something that is important to note about this protocol, it is mostly meant to create a gossip topology that is resilient to failure. It is not meant to actually send messages, it is meant to serve as the backbone topology (the peer sampling service) for figuring out what are the nodes.

The reason for that can be seen in the following 10 node cluster (after running heartbeat enough times to get to a stable state:


Let us assume that we want to disseminate a message across the cluster. We select node A as our root, and then send a message. The rules are as follow:

  • Each node send the message to all its active connections (except the sender, of course).
  • A node that got the same message twice will ignore the message.

Based on those rules, and the topology above, we’re going to have the following chain of messages:

  • F – initial broadcast
  • F -> E, G, J
  • E -> G, I
  • G -> E, H
  • J -> I, H, J
  • H -> C, I
  • C -> B, A
  • B -> D, A
  • A -> B, D
  • D -> A

The total number of messages passed is 20. Which is twice as much as the optimal solution would generate.

What is worse, this is a very small network, and as the network grows, so will the number of redundant messages. This approach (called eager gossiping) has a major advantage, because it will traverse all paths in the graph, it will also traverse all the shortest paths. That means that the time to get a message from the origin to all nodes is the smallest, but the number of operations is high.

The Plumtree paper (Epidemic Broadcast Trees) presents a solution to this problem. It tries to minimize the number of messages while still maintaining both reliability and optimizing the number of messages that are passed as well as the distance they have to pass.

The way Plumtree works is explained in quite beautiful detail in the paper, but the underlying idea goes like this, we start using the same approach as the eager gossiping, but whenever we get a message that we already got, we will reply to the source and tell it to stop sending us further messages. This is done so the next time that a message will be sent, we can skip the known duplicate path, and reduce the number of overall messages that we have.

So the first run is going to generate 20 messages on the network. The second is going to generate just 13, you can see the non traversed paths in the following image:


Note that we didn’t pass any messages between I and J, or D and A. But a lot of the saving was achieved by avoiding duplicate notifications. So node I notified node H, but not vice versa. The next time we’ll run this, we have exactly 10 messages passing:


Now, obviously this is pretty cool, but that is under a stable state. What happens when they are failures? Well, at that point, the notion of lazy vs. eager peers come into play. One of the things we did initially was to clear the duplicate paths in the network, so we can optimize the number of messages being passed. That is pretty cool, but it also leave us vulnerable to failures. For example, imagine that nod H is down. What happens then?

There are two aspects of this that are interesting. Plumtrees only care about the notion of message passing. They don’t deal with topology changes. In this case, the responsibility to join the different parts of the network lies with the peer sampling service, which is HyParView in this case. That would figure out the communication issue, and forge new connections with the remaining nodes. Plumtree will get notified about that, and the process continue.

But let us leave that one aside, let us say that we have a static topology, how would Plumtree handle this? Well, at this point you have to realize that Plumtree doesn’t just drop a connection when a node tell it that it already heard about a message. It just move it to a lazy state. Periodically, a node will contact other nodes which told it that it wasn’t needed and tell them: “Hi, I got messages with ids (43,41,81), do you have them?”. In this way, a node whose contact point went down would become aware that there are missing messages. At that point, it start a timer, and if it didn’t hear about those missing messages, it will ask the node that told it about those messages to send them over, and initiate an active link. The end result here is that we send additional messages, but those tend to be pretty small, just the message ids.

During steady state, we’ll just ignore those messages, but if there is a failure, they can help us recover from errors by letting us know that there are messages that we are missing, and taking action to recover that.

There is also another important aspect of this behavior, detecting and circumventing slow nodes. If a node is slow to distribute messages to its peers, other nodes will notify those peers that those messages exists, and if that is the case, we’ll eventually move to a more efficient topology by routing around that slow node.

You can see a full visualization of that (and admire my rapidly improving UI skills) here. The JavaScript implementation of the algorithm is here.

Plumtree has a few weaknesses, mostly it is that it is optimized for a single source topology. In other words, the first node you start from will influence the optimization of the network, and if you start a broadcast from another node, it won’t be an optimal operation. That said, there are a few ways to handle that. The actual topology remains the same, what influence Plumtree is the rejection replies from nodes that say that the information it transmitted was already received. We can keep track on not only the nodes that rejected us, but the root source of that rejection, so a message originating in E wouldn’t stop us from propagating a message originating in J.

Because Plumtree is meant for very large clusters (the paper talks about testing this with 10,000 nodes), and you might have a message originate from any one of those, you probably want to limit the notion of “origin”, if you track the past three nodes it passed through, you get a reasonably small amount of data that you have to keep, and it is likely to be accurate enough to build multiple topologies that will optimize themselves based on actual conditions.

That is it for this post, I’ve got a couple more posts that I want to write about gossips, but that would be it for today.

Gossip much? Disseminating information among high number (10K) of nodes

Every once in a while, I like to sit down and read about what is going on outside my current immediate field of interest. This weekend, I chose to focus on efficient information dissemination with very large number of nodes.

The articles of interests for this weekend are HyParView and Epidemic Broadcast Trees (Plumtrees). There are a great read, and complement one another to a nice degree. HyParView is an algorithm that seeks to connect a set (of potentially very large number) of nodes together without trying to make each node connect to each other node. To simplify things, I’m going to talk about clusters of several dozens nodes, the articles have both been tested to the 10,000 nodes and with failure rates of up to 95% of the network. This post is here so I can work out the details in my mind, it may be that I’m wrong, so don’t try to treat this as a definitive source.

Let us assume that we have a network with 15 nodes in it. And we want to add a new node. One way of doing that would be to maintain a list of all the nodes in the system (that are what admins are for, after all) and have the node connect to all the other nodes. In this way, we can communicate between all the nodes very easily. Of course, that means that the number of connections we have in a network of 16(15+ new) nodes is 120. And that utterly ignore the notion of failure. But let us continue with this path, to see what unhappy landscape it is going to land us on.

We have a 15 node cluster, and we add a new node (so we have to give it all the other nodes), and it connects to all the other nodes and register with them. So far, so good. Now, let us say that there is a state change that we want to send to all the nodes in the network. We can do that by connecting to a single node, and having it distribute this information to all the other nodes. Cost of this would be 16 (1 to talk to the first node, then 15 for it to talk to the rest). That is very efficient, and it is easy to prove that this is indeed the most optimal way to disseminate information over the network (each node is only contacted once).

In a 16 node network, maybe that is even feasible. It is a small cluster, after all. But that is a big maybe, and I wouldn’t recommend it. If we grow the cluster size to a 100 node cluster, that gives us about 4,950(!) connections between all nodes, and the cost of sending a single piece of information is still the optimal N. But I think that this is easy to see that this isn’t the way to go about it. Mostly because you can’t do that, not even for the 16 node cluster. Usually when we talk about clusters we like to think about them as flat topologies, but that isn’t actually how it goes. Let us look at a better approximation of a real topology:


Yes, this isn’t quite right, but it is good enough for our purposes.

In this 16 node cluster, we have the green node, which is the one we initially contact to send some data to the entire cluster. What would happen if we tried to talk from that node to all the other nodes? Well, notice how much load it would place on the green’s node router. Or the general cost for the network in the area of the green node. Because of that, just straight on direct connection for the entire cluster is no something that you really want to do.

An alternative to do that, assuming that you have a fixed topology is to create a static tree structure, so you start with the green node, it then contacts three other nodes, who then each contact four other nodes. We still have the nice property so that each node is only getting the new information once. But we can parallelize the communication and reduce the load on a single segment of the network.

Which is great, if we have a static topology and zero failures. In practice, none of those is true, so we want something else, and hopefully something that would make this a lot easier to handle. This is where HyParView comes into play. I sat down and wrote a whole big description of how HyParView works, but it wasn’t something that you couldn’t get from the article. And one of the things that I did along the way was create a small implementation in JavaScript and plug this into a graph visualization, so I could follow what is going on there.


That means that I had to implement the HyParView protocol in JavaScript, but it turned out to be a great way to actually explore how the thing works, and it ended up with great visualization.

You can see it in action in this url, and you can read the actual ocde for the HyParView protocol here.

Here is the cluster at 5 nodes, just after we added E:


And here it is at 9 nodes, after it had a chance to be stable.


Note that we have the connections (active view) from each node to a up to 3 other nodes, but we also have other letters next to the node name, in []. That is the passive list, the list of nodes that we are not connected to, but will try if our connection to the one of the active list goes down.

In addition to just adding themselves to one of the nodes, the nodes will also attempt to learn the topology of the network in such a way that if there is a failure, they can recover from it. The JavaScript code I wrote is not a good JavaScript code, that isn’t my forte, but it should be enough to follow what is going on there. We are able to do very little work to have a self organizing system of nodes that discover the network.

Note that in large networks, none of the nodes would have the full picture of the entire network, but each node will have a partial view of it, and that is enough to send a message through the entire network. But I’m going to talk about this in another post.

In the meantime, go to this url and see it in action, (the actual ocde for the HyParView protocol here). Note that I've made the different action explicit, so you need to do heartbeats (and the algorithm relies on them for healing failures) to get proper behavior for the system. I've also created a predictable RNG, so we can always follow the same path in our iterations.

Transactions are a figment of your imagination

This post is in response for a few comments here. In particular, I get the sense that people expect businesses and systems to behave as if transactions are a real thing. The only problem here is that this is quite far from the truth.

Let me define what I mean by a transaction here. I’m not talking about database transactions, ACID or any such thing. I’m talking about the notion that any interaction between a user and a business, or between two businesses can actually be modeled with the notion of a transaction similar to what we see in databases.

That is, that we have an interaction that would either be all there, or won’t be there at all. The most obvious example is the notion of financial transaction, the notion that we debit an account and then we credit another account. And we have to do that in such a way that either both accounts were modified or none of them were modified. That is the classic example for database transactions, and it is wrong. As anyone who ever wrote a check or sent an wire money transfer can tell. A good discussion on how that actually works can be found here. Note that in this case, the way money transfer works, in the real world, is that you upload a file over FTP, then wait three to five days to see if the orders your sent were rejected.

Another example is the notion of accepting an order, in a transactional manner. If I accepted your order, after verifying that I have reserved everything, and my warehouse burned down, what do I do with your order? I can hardly roll it back.

To move away from businesses, let us consider doing something as unimportant as voting in a national election. Logically speaking, this is a pretty simple process. Identify the voter, record their vote, total the votes, select winner. Except that you can go back and force a re-election in a particular district if such is needed, or you might find a box of lost votes, or any of a hundred evil little things that crop up in the real world.

Any attempt to model such systems in neat transactional boxes with “all in or none at all” is going to break.

Lies, Service Level Agreements, Trust and failure mores

I had a very interesting discussion with Kelly Sommers in twitter. But 140 characters isn’t really enough to explain things. Also, it is interesting topic in general.

Kelly disagreed with this post: http://www.shopify.ca/technology/14909841-kafka-producer-pipeline-for-ruby-on-rails


You can read the full discussion here.

The basic premise is, there is a highly reliable distributed queue that is used to process messages, but because they didn’t have operational experience with this, they used a local queue to store the messages sending them over the network. Kelly seems to think that this is decreasing reliability. I disagree.

The underlying premise is simple, when do you consider it acceptable to lose a message. If returning an error to the client is fine, sure, go ahead and do that if you can’t reach the cluster. But if you are currently processing a 10 million dollar order, that is going to kinda suck, and anything that you can do to reduce the chance of that happening is good. Note that key part in this statement, we can only reduce the chance of this happening, we can’t ensure it.

One way to try that is to get a guaranteed SLA from the distributed queue. Once we have that, we can rely that it works. This seems to be what Kelly is aiming at:


And that is true, if you could rely on SLAs. Just this week we had a multi hour, multi region Azure outage. In fact, outages, include outages that violate SLAs are unfortunately common.

In fact, if we look at recent history, we have:

There are actually more of them, but I think that 5 outages in 2 years is enough to show a pattern.

And note specifically that I’m talking about global outages like the ones above. Most people don’t realize that complex systems operate in a constant mode of partial failure. If you ever read an accident investigative report, you’ll know that there is almost never just a single cause of failure. For example, the road was slippery and the driver was speeding and the ABS system failed and the road barrier foundation rotted since being installed. Even a single change in one of those would mitigate the accident from a fatal crash to didn’t happen to a “honey, we need a new car”.

You can try to rely on the distribute queue in this case, because it has an SLA. And Toyota also promises that your car won’t suddenly accelerate into a wall, but if you had a Toyota Camry in 2010… well, you know…

From my point of view, saving the data locally before sending over the network makes a lot of sense. In general, the local state of the machine is much more stable than than the network. And if there is an internal failure in the machine, it is usually too hosed to do anything about anyway. I might try to write to disk, and write to the network even if I can’t do that ,because I want to do my utmost to not lose the message.

Now, let us consider the possible failure scenarios. I’m starting all of them with the notion that I just got a message for a 10 million dollars order, and I need to send it to the backend for processing.

  1. We can’t communicate with the distributed queue. That can be because it is down, hopefully that isn’t the case, but from our point of view, if our node became split from the network, this has the same effect. We are writing this down to disk, so when we become available again, we’ll be able to forward the stored message to the distributed queue.
  2. We can’t communicate with the disk, maybe it is full, or there is an error, or something like that .We can still talk to the network, so we place it in the distributed queue, and we go on with our lives.
  3. We can’t communicate with the disk, we can’t communicate with the network. We can’t keep it in memory (we might overflow the memory), and anyway, if we are out of disk and network, we are probably going to be rebooted soon anyway. SOL, there is nothing else we can do at this point.

Note that the first case assumes that we actually do come back up. If the admin just blew this node away, then the data on that node isn’t coming back, obviously. But since the admin knows that we are storing things locally, s/he will at least try to restore the data from that machine.

We are safer (not safe, but more safe than without it). The question is whatever this is worth it? If your messages aren’t actually carrying financial information, you can probably just drop a few messages as long as you let the end user know about that, so they can retry. If you really care about each individual message, if it is important enough to go the extra few miles for it, then the store and forward model gives you a measurable level of extra safety.


No future posts left, oh my!


  1. The RavenDB Comic Strip (3):
    28 May 2015 - Part III – High availability & sleeping soundly
  2. Special Offer (2):
    27 May 2015 - 29% discount for all our products
  3. RavenDB Sharding (3):
    22 May 2015 - Adding a new shard to an existing cluster, splitting the shard
  4. Challenge (45):
    28 Apr 2015 - What is the meaning of this change?
  5. Interview question (2):
    30 Mar 2015 - fix the index
View all series



Main feed Feed Stats
Comments feed   Comments Feed Stats