Ayende @ Rahien

Hi!
My name is Oren Eini
Founder of Hibernating Rhinos LTD and RavenDB.
You can reach me by phone or email:

ayende@ayende.com

+972 52-548-6969

, @ Q c

Posts: 5,953 | Comments: 44,406

filter by tags archive

The business process of comparing the price of milk


A law recently came into effect in Israel that mandated all supermarket chains to publicize their full pricing data and keep it up to date. The idea is that doing so will allow users to easily compare prices and lower costs. That has led to some interesting complaints in the media.

“Evil corporations are publishing the data in an unreadable format instead of an accessible files”.

I was really amused by the unreadable format (pretty horrible XML, to tell you the truth) and the accessible files (Excel spreadsheets) definitions. But expecting the media to actually get something right is… flawed, probably.

At any rate, one of the guys in the office played around with the data, toying with the notion of creating a small app for that purpose. (Side node, there are currently at least three major efforts to have a price comparison app/site for this. One of them is by a client of us, but beside for some modeling advice, we have no relation to this).

So we sat down and talked about the kind of features that you will want from this kind of app. The first option, of course, is to define a cart of the products I want to buy, and then compare this cart on various chains and branches. (For each chain, there is a different price list, and prices are also different on a per branch basis).

So far, that is easy, but that isn’t really what you want to do. The next obvious steps would be to take into account sales and promotions. The easiest ones are sales such as 1+1 or 2+1.That require an optimizing engine that can suggest getting another bottle of coke to the two you already got (same price), or recommending the purchase of 10 packages of diapers (non perishable product, currently on significant sale).

But this is really just the start. Sales and promotions are complex, a typical condition for a sale would be a whole chicken for 1 NIS (if you purchase more than 150 NIS total). Or maybe you get complementary products, or… you get the point. You need to take into account all the various discounts you can get.

Speaking of discounts, that means that you now need to also consider not just static stuff, but also dynamic (per user). Maybe the user has a set of coupons that they can use, or maybe they have a club membership in a particular chain that has certain benefits (for example 3% discount on all the chain’s branded products and another 5% for any 5 other products preselected at the time you joined the club).

So you now need a recommendation engine that can run those kind of projections and make suggestions based on them.

Oh, and wait! We also need to consider substitutions. If I purchased Bamba (a common peanut butter snack in Israel, and one of the only common uses of peanut butter in Israel), which is shown on the left, I might actually want to get Shush, which is pretty much the same thing, only slightly less costly. Or maybe get Parpar, which is another replacement, which is even cheaper.

To make it easier to the people not well versed with Israeli snacks, that means that we want to offer RC Cola or Pepsi Cola instead of Coca Cola, because they are cheaper.

Of course, some people swear that the taste is totally different, so they can’t be replaced. But does anyone really care what brand of cleaning product they use to clean the floor? The answer, by the way, is yes, people really do care about that kind of stuff. But even when you have a specific brand, maybe you can get the 2 gallon bottle of cleaning solution instead of the 1 gallon, because it end up being cheaper.

This is just to outline the complexity inherit in this. To do this well, you need to do quite a lot. This is actually really interesting from a technical perspective.  But this isn’t a post about the technical side.

Instead, let us talk about money. In particular, how to make money here. You can assume that selling the app wouldn’t work, not even a freemium model. But an app that guide people on what store to buy? There are quite a lot of ways to make money here.

We’ll start with an interesting observation. If you are a gateway to Joe making money, that usually mean that that Joe is going to pay you. Something that is very obvious is club membership referral fees.

If I run your cart through my engine, and I can tell you “Dude, if you join chain Xyz club you can save 100NIS today and 15,000 NIS this year, just click on this button to join”, that is a pretty compelling argument. And the chain would pay a certain fee for everyone we registered for them. Another way to make money is to get the aggregated data and do stuff with it. Coming up with “chain Xyz is the cheapest” so they can use it on their ads is something that would be worth money.

Another way to do this is to run interference. Instead of going to the supermarket, we’ll just make that order for you, and it will be on your door in a few hours… and the app will make sure to always get it from the cheapest location.

There are also holidays, and they typically have big purchases for the dinners, etc. That means that we can build a cart for the holidays, check it with the various chains, then suggest buying them at chain Xyz. Of course, that was pre-negotiated so they would get the big sales (presumably we are ethical and make sure that that chain is really the cheapest).

Of course, all of this make an interesting assumption. That the chains care about the price you buy from them. They don’t really care about that, they care about their margins. That means that we can play on that. We can suggest a cheaper cart overall, but one that would be more profitable to the chain because the products that we suggest have lower price for the customer, but higher margin for the chain.

And we totally left out the notion of coupons and discounts for actually using the app. “Scan this screen to get a 7% discount for Coca Cola”, for example. This way, the people paying us aren’t just the chains, but the manufacturers as well.

And coming back to the technical side of things. Consider targeted discounts. If I know what kind of products you want to buy, we can get all the way to customer specific price listing. (I’ll sell you diapers at cost, and you’ll buy the store brand of toilet paper at the list price, instead of the usually discounted price, so the store profit is higher).

Nitpicker corner: I have zero knowledge of retail beyond that is need to actually purchase food. I have no idea if those options are valid, and this is purely a mental exercise. This is interesting because of that. There are some technical problems here (in the recommendations), but a lot more business problems (in the kind of partnerships and deals that you can make for the customers).

I didn’t spend a lot of time considering this, and I’m pretty sure that I missed quite a few options. But it is a good way to remember that most business problems have depth behind them, not just technical solutions.

The state of a failure condition


I’m looking over of a bunch of distributed algorithm discussion groups, and I recently saw several people making the same bad assumption. The issue is that in a distributed system, you have to assume that any communication between system can fail.

Because that is taken into account in any distributed algorithm, there is a school of thought that believe that errors shouldn’t generate replies. That is horrifying to me.

Let me give a concrete example. In the Raft algorithm, nodes will participate in an election in order to decide who is the leader. A node can decide to vote for a certain candidate, to reject a candidate or it may be down and not responsive. Since we have to handle the non responsive node anyway, it is easy to assume that we only need to reply to the candidate when we actually vote for it. After all, no reply is a negative reply already, no?

The issue with this design decision is that this is indeed correct, but it is also boneheaded*. There are two reasons here. The minor one is that a non reply will force us to wait until a pre-configured timeout happen, after which we can go into failure handling. But actually sending a reply when we know that we refuse to vote for a node can give that node more information, and cut down the time it takes for the node to respond to negative replies.

As important as that is, this isn’t really my main concern. My main concern here is that not sending a reply leaves the administrator trying to figure out what is going on with essentially zero data. On the other hand, if the node send a “you are missing X,Y and Z for me to consider you applicable”, that is something that can be traced, that can be shown and acted upon.

It may seem like a small thing, overall, but it is something with crucial importance for operations. Those are hard enough when you have a single node. When you have a distributed system, you have to plan for that explicitly.

* I am using this terminology intentionally. Anyone who don’t consider production support and monitoring for their software from the get go never had to support complex production systems, where every nugget of information can be crucial.

That ain’t going to take you anywhere


As part of our usual work routine, we field customer questions and inquiries. A pretty common one is to take a look at their system to make sure that they are making a good use of RavenDB.

Usually, this involves going over the code with the team, and making some specific recommendations. Merge those indexes, re-model this bit to allow for this widget to operate more cleanly, etc.

Recently we had such a review in which what I ended up saying is: “Buy a bigger server, hope this work, and rewrite this from scratch as fast as possible”.

The really annoying thing is that someone who was quite talented has obviously spent a lot of time doing a lot of really complex things to end up where they are now. It strongly reminded me of this image:

image

At this point, you can have the best horse in the world, but the only thing that will happen if it runs is that you are going to be messed up.

What was so bad? Well, to start with, the application was designed to work with a dynamic data model. That is probably also why RavenDB was selected, since that is a great choice for dynamic data.

Then the designers sat down and created the following system of classes:

public class Table
{
	public Guid TableId {get;set;}
	public List<FieldInformation> Fields {get;set;}
	public List<Reference> References {get;set;}
	public List<Constraint> Constraints {get;set;}
}

public class FieldInformation
{
	public Guid FieldId {get;set;}
	public string Name {get;set;}
	public string Type {get;set;}
	public bool Required {get;set;}
}

public class Reference
{
	public Guid ReferenceId {get;set;}
	public string Field {get;set;}
	public Guid ReferencedTableId {get;set;}
}

public class Instance
{
	public Guid InstanceId {get;set;}
	public Guid TableId {get;set;}
	public List<Guid> References {get;set;}
	public List<FieldValue> Values {get;set;}
}

public class FieldValue
{
	public Guid FieldId {get;set;}
	public string Value {get;set;}
}

I’ll let you draw your own conclusions about how the documents looked like, or just how many calls you needed to load a single entity instance.

For that matter, it wasn’t possible to query such a system directly, obviously, so they created a set of multi-map/reduce indexes that took this data and translated that into something resembling a real entity, then queried that.

But the number of documents, indexes and the sheer travesty going on meant that actually:

  • Saving something to RavenDB took a long time.
  • Querying was really complex.
  • The number of indexes was high
  • Just figuring out what is going on in the system was nigh impossible without a map, a guide and a lot of good luck.

Just to cap things off, this is a .NET project, and in order to connect to RavenDB they used direct REST calls using HttpClient. Blithely ignoring all the man-decades that were spent in creating a good client side experience and integration. For example, they made no use of Etags or Not-Modified-Since, so a lot of the things that RavenDB can do (even under such… hardship) to make things better weren’t supported, because the client code won’t cooperate.

I don’t generally say things like “throw this all away”, but there is no mid or long term approach that could possibly work here.

Linux, Debts and Out Of Memory Killer


Imagine that you go to the bank, and ask for a 100,000$ mortgage. The nice guy in the bank agrees to lend you the money,  and since you need to pay that in 5 installments, you take 15,000$ to the contractor, and leave the rest in the bank until it is needed. The bank is doing brisk business, and promise a lot of customers that they can get their mortgage in the bank. Since most of the mortgages are also taken in installments, the bank never actually have enough money to hand over to all lenders. But it make do.

Until one dark day when you come to the bank and ask for the rest of the money, because it is time to install the kitchen cabinets, and you need to pay for that. The nice man in the bank tell you to wait a bit, and goes to see if they have any money. At this point, it would be embarrassing to tell you that they don’t have any money to give you, because they over committed themselves. The nice man from the bank murders you and bury your body in the desert, to avoid you complaining that you didn’t get the money that you were promised.  Actually, the nice man might go ahead and kill someone else (robbing them in the process), and then give you their money. You go home happy to your blood stained kitchen cabinets.

That is how memory management works in Linux.

After this dramatic opening, let us get down to what is really going on. Linux has a major problem. Its process model means that it is stuck up a tree and the only way down is via free fall. Whenever a process wants to create another process, the standard method in Linux is to call fork() and then call execv() to execute the new binary. The problem here is what fork() does. It needs to copy the entire process state to the new process. That include all memory, handles, registers, etc.

Let us assume that we have a process that allocated 1GB of memory for reading and writing, and then called fork(). The way things are setup, it is pretty cheap to create the new process, all we need to do is duplicate the kernel data structures and we are done. However, what happens when the memory that the process allocated? The fork() call requires that both processes will have access to that memory, and also that both of them may modify it. That means that we have a copy on write situation. Whenever one of the processes modify the memory, it is forcing the OS to copy that piece of memory to another physical memory location and remap the virtual addresses.

This allows the developer to do some really cool stuff. Redis implemented its backup strategy via the fork() call. By forking and then dumping the in memory process state to disk it can get consistent snapshot of the system with almost no code. It is the OS that is responsible for maintaining that invariant.

Speaking of invariants, it also means that there is absolutely no way that Linux can manage memory properly. If we have 2 GB of RAM on the machine, and we have a 1GB process that fork()-ed, what is going to happen? Well, it was promised 1 GB of RAM, and it got that. And it was also promised by fork() that both processes will be able to modify the full 1GB of RAM. If we also have some other processes taking memory (and assuming no swap for the moment), that pretty much means that someone is going to end up holding the dirty end of the stick.

Now, Linux has a configuration option that would prevent it (vm.overcommit_memory = 2, and the over commit ratio, but that isn’t really important. I’m including this here for the nitpickers, and yes, I’m aware that you can set oom_adj = –17 to protect myself from this issue, not the point.). This tell Linux that it shouldn’t over commit. In such cases, it would mean that the fork() method call would fail, and you’ll be left with an effectively a crippled system. So, we have the potential for a broken invariant. What is going to happen now?

Well, Linux promised you memory, and after exhausting all of the physical memory, it will start paging to swap file. But that can be exhausted to. That is when the Out Of Memory Killer gets to play, and it takes an axe and start choosing a likely candidate to be mercilessly murdered. The “nice” thing about this is that there is no control over that, and you might be a perfectly well behaved process that the OOM just doesn’t like this Monday, so buh-bye!

Looking around, it seems that we aren’t the only one that had run head first into this issue. The Oracle recommendation is to set things up to panic and reboot the entire machine when this happens, and that seems… unproductive.

The problem is that as a database, we aren’t really in control of how much we allocate, and we rely on the system to tell us when we do too much. Linux has no facility to do things like warn applications that memory is low, or even letting us know that by refusing to allocate more memory. Both are things that we already support, and would be very helpful.

That is quite annoying.

.NET Packaging mess


In the past few years, we had:

  • .NET Full
  • .NET Micro
  • .NET Client Profile
  • .NET Silverlight
  • .NET Portable Class Library
  • .NET WinRT
  • Core CLR
  • Core CLR (Cloud Optimized)*
  • MessingWithYa CLR

* Can’t care enough to figure out if this is the same as the previous one or not.

In each of those cases, they offered similar, but not identical API and options. That is completely ignoring the versioning side of things ,where we have .NET 2.0 (1.0 finally died a while ago), .NET 3.5, .NET 4.0 and .NET 4.5. I don’t think that something can be done about versioning, but the packaging issue is painful.

Here is a small example why:

image

In each case, we need to subtly tweak the system to accommodate the new packaging option. This is pure additional cost to the system, with zero net benefit. Each time that we have to do that, we add a whole new dimension to the testing and support matrix, leaving aside the fact that the complexity of the solution is increasing.

I wouldn’t mind it so much, if it weren’t for the fact that a lot of those are effectively drive-bys, it feels. Silverlight took a lot of effort, and it is dead. WinRT took a lot of effort, and it is effectively dead.

This adds a real cost in time and effort, and it is hurting the platform as a whole.

Now users are running into issues with the Core CLR not supporting stuff that we use. So we need to rip out MEF from some of our code, and implement it ourselves just to get things in the same place as before.

Excerpts from the RavenDB Performance team reportExpensive headers, and cache effects


This ended up being a pretty obvious, in retrospect. We noticed in the profiler that we spent a lot of time working with headers. Now, RavenDB is using REST as the communication layer, so it is doing a lot with that, but we should be able to do better.

Then Tal dug into the actual implementation and found:

public string GetHeader(string key)
{
	if (InnerHeaders.Contains(key) == false)
		return null;
	return InnerHeaders.GetValues(key).FirstOrDefault();
}

public List<string> GetHeaders(string key)
{
	if (InnerHeaders.Contains(key) == false)
		return null;
	return InnerHeaders.GetValues(key).ToList();
}


public HttpHeaders InnerHeaders
{
	get
	{
		var headers = new Headers();
		foreach (var header in InnerRequest.Headers)
		{
			if (header.Value.Count() == 1)
				headers.Add(header.Key, header.Value.First());
			else
				headers.Add(header.Key, header.Value.ToList());
		}

		if (InnerRequest.Content == null)
			return headers;

		foreach (var header in InnerRequest.Content.Headers)
		{
			if (header.Value.Count() == 1)
				headers.Add(header.Key, header.Value.First());
			else
				headers.Add(header.Key, header.Value.ToList());
		}

		return headers;
	}
}

To be fair, this implementation was created very early on, and no one ever actually spent any time looking it since (why would they? it worked, and quite well). The problem is the number of copies that we have, and the fact that to pull a since header, we have to copy all the headers, sometimes multiple times. We replaced this with code that wasn’t doing stupid stuff, and we couldn’t even find the cost of working with headers in the profiler any longer.

But that brings up a really interesting question. How could we not know about this sort of thing? I mean, this isn’t the first time that we are doing a performance pass on the system. So how come we missed this?

The answer is that in this performance pass, we are doing something different. Usually we perf-test RavenDB as you would when using it on your own systems. But for the purpose of this suite of tests, and in order to find more stuff that we can optimize, we are actually working with a stripped down client, no caching, no attempt to optimize things across the entire board. In fact, we have put RavenDB in the worst possible situation, all new work, and no chance to do any sort of optimizations, then we start seeing how all of those code paths that were rarely hit started to light up quite nicely.

Gossip much? Use cases and bad practices for gossip protocols


My previous few posts has talked about specific algorithms for gossip protocols, specifically: HyParView and Plumtrees. They dealt with the technical behavior of the system, the process in which we are sending data over the cluster to all the nodes. In this post, I want to talk a bit about what kind of messages we are going to send in such a system.

The obvious one is to try to keep the entire state of the system up to date using gossip. So whenever we make a change, we gossip about it to the entire network, and we are able to get to an eventually consistent system in which all nodes have roughly the same data. There is one problem with that, you now have a lot of nodes with the same data on them. At some point, that stop making sense. Usually gossip is used when you have a large group of servers, and just keep all the data on all the nodes is not a good idea unless your data set is very small.

So you don’t do that.  Gossip is usually used to disseminate a small data set, one that can fit comfortably inside a single machine (usually it is a very small data set, a few hundred MB at most). Let us consider a few types of messages that would fit in a gossip setting.

The obvious example is the actual topology of the whole network. A node joining up the cluster will announce its presence, and that will percolate to the entire cluster, eventually. That can allow you to have an idea (note, this isn’t a certainty) about what is the structure of the cluster, and maybe make decisions based on it.

The system wide configuration data is also a good candidate for gossip, for example, you can use gossip as a distributed service locator in the cluster. Whenever a new SMTP server comes online, it announces itself via gossip to the cluster. It is added to the list of SMTP servers in all the nodes that heard about it, and then it get used. In this kind of system, you have to take into account that servers can be down for a long period of time, and miss up on messages. Gossip does not guarantee that the messages will arrive, after all. Oh, it is going to do its best, but you need to also build an anti entropy system. If a server finds that it missed up on too much, it can request one of its peers to send it a full snapshot of the current global state as that peer know it.

Going in the same vein, nodes can gossip about the health state of the network. If I’m trying to send an email via an SMTP server, and it is down, I’m going to try another server, and let the network know that I’ve failed to talk to that particular server. If enough nodes fail to communicate with the server, that become part of the state of the system, so nodes that learned about it can avoid that server for a period of time.

Moving into a different direction, you can also do gossip queries, that can be done by sending a gossip message on the cluster with a specific query to it. A typical example might be “which node has a free 10GB that I can use?”. Such queries typically carry with them a timeout element. You send the query, and any matches are sent back to (either directly or also via gossip). After a predefined timeout, you can assume that you got all the replies that you are going to get, so you can operate on that. More interesting is when you want to query for the actual data held in each node. If we want to find all the users who logged in today, for example.

The problem with doing something like that is that you might have a large result set, and you’ll need to have some way to work with that. You don’t want to send it all to a single destination, and what would you do with it, anyway? For that reason, most of the time gossip queries are actually aggregation. We can use that to get an estimate of certain things in our cluster. If we wanted to get the number of users per country, that would be a good candidate for this, for example. Note that you won’t necessarily get accurate results, if you have failures, so there are aggregation methods for getting a good probability of the actual value.

For fun, here is an interesting exercise. Look at trending topics in a large number of conversations. In this case, whenever you would get a new message, you would analyze the topics for this message, and periodically (every second, let us say), you’ll gossip to your peers about this. In this case, we don’t just blindly pass the gossip between nodes. Instead, we’ll use a slightly different method. Each second, every node will contact its peers to send them the current trending topics in the node. Each time the trending topics change, a version number is incremented. In addition, the node also send its peer the node ids and versions of the messages it got from other nodes. The peer, in reply, will send a confirmation about all the node ids and versions that it has. So the origin node can fill in about any new information that it go, or ask to get updates for information that it doesn’t have.

This reduce the number of updates that flow throughout the cluster, while still maintain an eventually consistent model. We’ll be able to tell, from each node, what are the current trending topics globally.

Gossip much? Operating with partial information, and getting the right result.


Unlike the previous two posts, this is going to be short. Primarily because what I wanted to talk about it what impresses me most with both HyParView and Plumtree. The really nice thing about them is that they are pretty simple, easy to understand and produce good results.

But the fun part, and what make it impressive is that they manage to achieve that with a small set of simple rules, and without any attempt to create a global view. They operate just fine with potentially very small set of the data overall, but still manage to operate, self optimize and get to the correct result. In fact, I did some very minor attempts to play with this at large scale, and we see a pretty amazing emergent behavior. Without anyone knowing what is going on globally, we are able to get to the optimal number of interactions in the cluster to distribute information.

That is really pretty cool.

And because this post is too short, I’ll leave you with a question. Given that you have this kind of infrastructure, what would you do with it? What sort of information or operations would you try to send using this way?

Gossip much? The gossip epidemic and other issues in polite society


In my previous post, I talked about the Hybrid Partial View protocol, and showed a visualization about how it actually works. Something that is important to note about this protocol, it is mostly meant to create a gossip topology that is resilient to failure. It is not meant to actually send messages, it is meant to serve as the backbone topology (the peer sampling service) for figuring out what are the nodes.

The reason for that can be seen in the following 10 node cluster (after running heartbeat enough times to get to a stable state:

image

Let us assume that we want to disseminate a message across the cluster. We select node A as our root, and then send a message. The rules are as follow:

  • Each node send the message to all its active connections (except the sender, of course).
  • A node that got the same message twice will ignore the message.

Based on those rules, and the topology above, we’re going to have the following chain of messages:

  • F – initial broadcast
  • F -> E, G, J
  • E -> G, I
  • G -> E, H
  • J -> I, H, J
  • H -> C, I
  • C -> B, A
  • B -> D, A
  • A -> B, D
  • D -> A

The total number of messages passed is 20. Which is twice as much as the optimal solution would generate.

What is worse, this is a very small network, and as the network grows, so will the number of redundant messages. This approach (called eager gossiping) has a major advantage, because it will traverse all paths in the graph, it will also traverse all the shortest paths. That means that the time to get a message from the origin to all nodes is the smallest, but the number of operations is high.

The Plumtree paper (Epidemic Broadcast Trees) presents a solution to this problem. It tries to minimize the number of messages while still maintaining both reliability and optimizing the number of messages that are passed as well as the distance they have to pass.

The way Plumtree works is explained in quite beautiful detail in the paper, but the underlying idea goes like this, we start using the same approach as the eager gossiping, but whenever we get a message that we already got, we will reply to the source and tell it to stop sending us further messages. This is done so the next time that a message will be sent, we can skip the known duplicate path, and reduce the number of overall messages that we have.

So the first run is going to generate 20 messages on the network. The second is going to generate just 13, you can see the non traversed paths in the following image:

image

Note that we didn’t pass any messages between I and J, or D and A. But a lot of the saving was achieved by avoiding duplicate notifications. So node I notified node H, but not vice versa. The next time we’ll run this, we have exactly 10 messages passing:

image

Now, obviously this is pretty cool, but that is under a stable state. What happens when they are failures? Well, at that point, the notion of lazy vs. eager peers come into play. One of the things we did initially was to clear the duplicate paths in the network, so we can optimize the number of messages being passed. That is pretty cool, but it also leave us vulnerable to failures. For example, imagine that nod H is down. What happens then?

There are two aspects of this that are interesting. Plumtrees only care about the notion of message passing. They don’t deal with topology changes. In this case, the responsibility to join the different parts of the network lies with the peer sampling service, which is HyParView in this case. That would figure out the communication issue, and forge new connections with the remaining nodes. Plumtree will get notified about that, and the process continue.

But let us leave that one aside, let us say that we have a static topology, how would Plumtree handle this? Well, at this point you have to realize that Plumtree doesn’t just drop a connection when a node tell it that it already heard about a message. It just move it to a lazy state. Periodically, a node will contact other nodes which told it that it wasn’t needed and tell them: “Hi, I got messages with ids (43,41,81), do you have them?”. In this way, a node whose contact point went down would become aware that there are missing messages. At that point, it start a timer, and if it didn’t hear about those missing messages, it will ask the node that told it about those messages to send them over, and initiate an active link. The end result here is that we send additional messages, but those tend to be pretty small, just the message ids.

During steady state, we’ll just ignore those messages, but if there is a failure, they can help us recover from errors by letting us know that there are messages that we are missing, and taking action to recover that.

There is also another important aspect of this behavior, detecting and circumventing slow nodes. If a node is slow to distribute messages to its peers, other nodes will notify those peers that those messages exists, and if that is the case, we’ll eventually move to a more efficient topology by routing around that slow node.

You can see a full visualization of that (and admire my rapidly improving UI skills) here. The JavaScript implementation of the algorithm is here.

Plumtree has a few weaknesses, mostly it is that it is optimized for a single source topology. In other words, the first node you start from will influence the optimization of the network, and if you start a broadcast from another node, it won’t be an optimal operation. That said, there are a few ways to handle that. The actual topology remains the same, what influence Plumtree is the rejection replies from nodes that say that the information it transmitted was already received. We can keep track on not only the nodes that rejected us, but the root source of that rejection, so a message originating in E wouldn’t stop us from propagating a message originating in J.

Because Plumtree is meant for very large clusters (the paper talks about testing this with 10,000 nodes), and you might have a message originate from any one of those, you probably want to limit the notion of “origin”, if you track the past three nodes it passed through, you get a reasonably small amount of data that you have to keep, and it is likely to be accurate enough to build multiple topologies that will optimize themselves based on actual conditions.

That is it for this post, I’ve got a couple more posts that I want to write about gossips, but that would be it for today.

Gossip much? Disseminating information among high number (10K) of nodes


Every once in a while, I like to sit down and read about what is going on outside my current immediate field of interest. This weekend, I chose to focus on efficient information dissemination with very large number of nodes.

The articles of interests for this weekend are HyParView and Epidemic Broadcast Trees (Plumtrees). There are a great read, and complement one another to a nice degree. HyParView is an algorithm that seeks to connect a set (of potentially very large number) of nodes together without trying to make each node connect to each other node. To simplify things, I’m going to talk about clusters of several dozens nodes, the articles have both been tested to the 10,000 nodes and with failure rates of up to 95% of the network. This post is here so I can work out the details in my mind, it may be that I’m wrong, so don’t try to treat this as a definitive source.

Let us assume that we have a network with 15 nodes in it. And we want to add a new node. One way of doing that would be to maintain a list of all the nodes in the system (that are what admins are for, after all) and have the node connect to all the other nodes. In this way, we can communicate between all the nodes very easily. Of course, that means that the number of connections we have in a network of 16(15+ new) nodes is 120. And that utterly ignore the notion of failure. But let us continue with this path, to see what unhappy landscape it is going to land us on.

We have a 15 node cluster, and we add a new node (so we have to give it all the other nodes), and it connects to all the other nodes and register with them. So far, so good. Now, let us say that there is a state change that we want to send to all the nodes in the network. We can do that by connecting to a single node, and having it distribute this information to all the other nodes. Cost of this would be 16 (1 to talk to the first node, then 15 for it to talk to the rest). That is very efficient, and it is easy to prove that this is indeed the most optimal way to disseminate information over the network (each node is only contacted once).

In a 16 node network, maybe that is even feasible. It is a small cluster, after all. But that is a big maybe, and I wouldn’t recommend it. If we grow the cluster size to a 100 node cluster, that gives us about 4,950(!) connections between all nodes, and the cost of sending a single piece of information is still the optimal N. But I think that this is easy to see that this isn’t the way to go about it. Mostly because you can’t do that, not even for the 16 node cluster. Usually when we talk about clusters we like to think about them as flat topologies, but that isn’t actually how it goes. Let us look at a better approximation of a real topology:

image

Yes, this isn’t quite right, but it is good enough for our purposes.

In this 16 node cluster, we have the green node, which is the one we initially contact to send some data to the entire cluster. What would happen if we tried to talk from that node to all the other nodes? Well, notice how much load it would place on the green’s node router. Or the general cost for the network in the area of the green node. Because of that, just straight on direct connection for the entire cluster is no something that you really want to do.

An alternative to do that, assuming that you have a fixed topology is to create a static tree structure, so you start with the green node, it then contacts three other nodes, who then each contact four other nodes. We still have the nice property so that each node is only getting the new information once. But we can parallelize the communication and reduce the load on a single segment of the network.

Which is great, if we have a static topology and zero failures. In practice, none of those is true, so we want something else, and hopefully something that would make this a lot easier to handle. This is where HyParView comes into play. I sat down and wrote a whole big description of how HyParView works, but it wasn’t something that you couldn’t get from the article. And one of the things that I did along the way was create a small implementation in JavaScript and plug this into a graph visualization, so I could follow what is going on there.

 

That means that I had to implement the HyParView protocol in JavaScript, but it turned out to be a great way to actually explore how the thing works, and it ended up with great visualization.

You can see it in action in this url, and you can read the actual ocde for the HyParView protocol here.

Here is the cluster at 5 nodes, just after we added E:

image

And here it is at 9 nodes, after it had a chance to be stable.

image

Note that we have the connections (active view) from each node to a up to 3 other nodes, but we also have other letters next to the node name, in []. That is the passive list, the list of nodes that we are not connected to, but will try if our connection to the one of the active list goes down.

In addition to just adding themselves to one of the nodes, the nodes will also attempt to learn the topology of the network in such a way that if there is a failure, they can recover from it. The JavaScript code I wrote is not a good JavaScript code, that isn’t my forte, but it should be enough to follow what is going on there. We are able to do very little work to have a self organizing system of nodes that discover the network.

Note that in large networks, none of the nodes would have the full picture of the entire network, but each node will have a partial view of it, and that is enough to send a message through the entire network. But I’m going to talk about this in another post.

In the meantime, go to this url and see it in action, (the actual ocde for the HyParView protocol here). Note that I've made the different action explicit, so you need to do heartbeats (and the algorithm relies on them for healing failures) to get proper behavior for the system. I've also created a predictable RNG, so we can always follow the same path in our iterations.

FUTURE POSTS

No future posts left, oh my!

RECENT SERIES

  1. The RavenDB Comic Strip (3):
    28 May 2015 - Part III – High availability & sleeping soundly
  2. Special Offer (2):
    27 May 2015 - 29% discount for all our products
  3. RavenDB Sharding (3):
    22 May 2015 - Adding a new shard to an existing cluster, splitting the shard
  4. Challenge (45):
    28 Apr 2015 - What is the meaning of this change?
  5. Interview question (2):
    30 Mar 2015 - fix the index
View all series

Syndication

Main feed Feed Stats
Comments feed   Comments Feed Stats