Ayende @ Rahien

It's a girl

Reading Habits

I’m reading a lot, and I thought that I would post a bit about my favorite subjects. I decided to summarize this year with great books that don’t really fall into standard categories, which I really enjoyed.

The AlterWorld – By a Russian author, and with a great background there (how to identify a Russian was great), and are really good. The premise is that you can get stuck in a MMORPG and it is beautifully done. Unlike a fantasy book, the notion of levels, gaining strength and power is really nice. Especially since the hero isn’t actually taking the direct path to that. There is also a lot of interaction with the real world, and in general, this is a fully featured universe that is really good. It looks like there are going to be 3 more books, which is absolutely wonderful from my point of view.

AlterWorld The Clan The Duty

Those books were good enough that I started playing RPGs again, just because it was so much fun reading the status messages in the books. If you know of other books in the same space, I would love to know about it.

NPCs tells the tale from the point of view of Non Player Characters, which is quite interesting and done in a very believable way.

NPCs

Caverns & Creatures is a series of books (lost count, there are a lot of short stories as well as full length books) that deals with the idea of people getting stuck in RPG world. This one is mostly meant for humor’s sake, I think. And it does get to toilet level humor all too frequently, but it is entertaining.

Critical Failures

Waldo Rabbit tells the tale of a guy that really tries to be an evil overload, but his idea of scary beast is a… rabbit. It is a really well written, and I’m looking forward for the next book.

The (sort of) Dark Mage (Wa... After The Rabbit (Waldo Rab...
Wizard 2.0 talks about finding proof that the entire world is a computer simulation, and what happens when certain people find out about it. My guess is that this is written by a programmer, because the parts where they talk about software and programming wasn’t made up in whole cloth and didn’t piss me off at all. This is also really good series, and I’m looking forward to reading the 3rd book.  I especially liked that there isn’t some big Save The World theme going on, this is just life as you know it, if you are a bunch of pixels.

Off to Be the Wizard (Magic... Spell or High Water (Magic ... An Unwelcome Quest (Magic 2...

Velveteen is a “superhero” novel, but a very different one than the usual one. I’m not really sure how to categorize it, but it was a really great read.

Velveteen vs. The Junior Su... Velveteen vs. The Multivers...

Daniel Black’s is a single book series, with a second book, Black Coven set to follow Fimbulwinter. It is a great book, with a very well written background and story. What is more, the hero doesn’t rely on brute force or the author to rescue him when he stupidly gets into trouble, he thinks and plans, and that is quite great to read. I’m eagerly waiting for the next book.

Fimbulwinter (Daniel Black #1)

Published at

Originally posted at

Minimal incremental backups improvements in Voron

Voron uses a fairly standard journaling system to ensure ACID compliance. Whenever you commit a transaction, we write the changed pages to the log.

Let us take the simplest example, here is a very simple page:

image

Now we write the following code:

for(var i = 0; i< 3; i++ )
{
using(var tx = env.NewTransaction(TransactionFlags.ReadWrite))
{
var users = tx.ReadTree("users");
users.Add("Users/" + NextUserId(), GetUserData());
tx.Commit();
}
}

After executing this code, the page is going to look like this:

image

But what is going to happen in the log? We have to commit each transaction separately, so what we end up is the following journal file:

image

The log contains 3 entries for the same page. Which make sense, because we changed that in 3 different transactions. Now, let us say that we have a crash, and we need to apply the journal. We are going to scan through the journal, and only write Page #23 from tx #43. For the most part, we don’t care for the older version of the page. That is all well and good, until incremental backups comes into play.

The default incremental backup approach used by Voron is simplicity itself. Just create a stable copy of the journal files, and you are pretty much done, there isn’t anything else that you need to do. Restoring involves applying those transactions in the order they appear in the journal, and rainbows and unicorns and peace on Earth.

However, in many cases, the journal files contain a lot of outdated data. I mean, we really don’t care for the state of Page #23 in tx #42. This came into play when we started to consider how we’ll create Raft snapshots on an ongoing basis for large datasets. Just using incremental backup was enough to give us a lot of promise, but it also expose the issue with the size of the journal becoming an issue.

That is when we introduced the notion of a minimal incremental backup. A minimal incremental backup is a lot more complex to create, but it actually restore in the exact same way as a normal incremental backup.

Conceptually, it is a very simple idea. Read the journals, but instead of just copying them as is, scan through them and find the latest version of all the pages that appear in the journal. In this case, we’ll have a Page #23 from tx #43. And then we generate a single transaction for all the latest versions of all the pages in the transactions we read.

We tried it on the following code:

for (int xi = 0; xi < 100; xi++)
{
using (var tx = envToSnapshot.NewTransaction(TransactionFlags.ReadWrite))
{
var tree = envToSnapshot.CreateTree(tx, "test");

for (int i = 0; i < 1000; i++)
{
tree.Add("users/" + i, "john doe/" + i);
}

tx.Commit();
}
}


This is an interesting experiment, because we are making modifications to the same keys (and hence, probably the same pages), multiple times. This also reflects a common scenario in which we have a high rate of updates.

Min incremental backup created an 8Kb file to restore this. While the standard incremental backup created a 67Kb file for the purpose.

That doesn’t sounds much, until you realize that those are compressed files, and the uncompressed sizes were 80Kb for the minimal incremental backup, and 1.57Mb for the incremental backup. Restoring the min incremental backup is a lot more efficient. However, it ain’t all roses.

An incremental backup will result in the ability to replay transactions one at a time. A min incremental backup will merge transactions, so you get the same end results, but you can’t stop midway (for example, to do partial rollback). Taking a min incremental backup is also more expensive, instead of doing primarily file I/O, we have to read the journals, understand them and output the set of pages that we actually care about.

For performance and ease of use, we limit the size of a merged transaction generated by a min incremental backup to about 512 MB. So if you have made changes to over 512MB of your data since the last backup, we’ll still generate a merged view of all the pages, but we’ll actually apply that across multiple transactions. The idea is to avoid trying to apply a very big transaction and consume all resources from the system in this manner.

Note that this feature was developed specifically to enable better behavior when snapshotting state machines for use within the Raft protocol. Because Raft is using snapshots to avoid having an infinite log, and because the Voron journal is effectively the log of all the disk changes made in the system, that was something that we had to do. Otherwise we couldn’t rely on incremental backups for snapshots (we would have just switched the Raft log with the Voron journal, probably we no save in space). That would have forced us to rely on full backups, and we don’t want to take a multi GB backup very often if we can potentially avoid it.

Tags:

Published at

Originally posted at

Comments (2)

Gossip much? Use cases and bad practices for gossip protocols

My previous few posts has talked about specific algorithms for gossip protocols, specifically: HyParView and Plumtrees. They dealt with the technical behavior of the system, the process in which we are sending data over the cluster to all the nodes. In this post, I want to talk a bit about what kind of messages we are going to send in such a system.

The obvious one is to try to keep the entire state of the system up to date using gossip. So whenever we make a change, we gossip about it to the entire network, and we are able to get to an eventually consistent system in which all nodes have roughly the same data. There is one problem with that, you now have a lot of nodes with the same data on them. At some point, that stop making sense. Usually gossip is used when you have a large group of servers, and just keep all the data on all the nodes is not a good idea unless your data set is very small.

So you don’t do that.  Gossip is usually used to disseminate a small data set, one that can fit comfortably inside a single machine (usually it is a very small data set, a few hundred MB at most). Let us consider a few types of messages that would fit in a gossip setting.

The obvious example is the actual topology of the whole network. A node joining up the cluster will announce its presence, and that will percolate to the entire cluster, eventually. That can allow you to have an idea (note, this isn’t a certainty) about what is the structure of the cluster, and maybe make decisions based on it.

The system wide configuration data is also a good candidate for gossip, for example, you can use gossip as a distributed service locator in the cluster. Whenever a new SMTP server comes online, it announces itself via gossip to the cluster. It is added to the list of SMTP servers in all the nodes that heard about it, and then it get used. In this kind of system, you have to take into account that servers can be down for a long period of time, and miss up on messages. Gossip does not guarantee that the messages will arrive, after all. Oh, it is going to do its best, but you need to also build an anti entropy system. If a server finds that it missed up on too much, it can request one of its peers to send it a full snapshot of the current global state as that peer know it.

Going in the same vein, nodes can gossip about the health state of the network. If I’m trying to send an email via an SMTP server, and it is down, I’m going to try another server, and let the network know that I’ve failed to talk to that particular server. If enough nodes fail to communicate with the server, that become part of the state of the system, so nodes that learned about it can avoid that server for a period of time.

Moving into a different direction, you can also do gossip queries, that can be done by sending a gossip message on the cluster with a specific query to it. A typical example might be “which node has a free 10GB that I can use?”. Such queries typically carry with them a timeout element. You send the query, and any matches are sent back to (either directly or also via gossip). After a predefined timeout, you can assume that you got all the replies that you are going to get, so you can operate on that. More interesting is when you want to query for the actual data held in each node. If we want to find all the users who logged in today, for example.

The problem with doing something like that is that you might have a large result set, and you’ll need to have some way to work with that. You don’t want to send it all to a single destination, and what would you do with it, anyway? For that reason, most of the time gossip queries are actually aggregation. We can use that to get an estimate of certain things in our cluster. If we wanted to get the number of users per country, that would be a good candidate for this, for example. Note that you won’t necessarily get accurate results, if you have failures, so there are aggregation methods for getting a good probability of the actual value.

For fun, here is an interesting exercise. Look at trending topics in a large number of conversations. In this case, whenever you would get a new message, you would analyze the topics for this message, and periodically (every second, let us say), you’ll gossip to your peers about this. In this case, we don’t just blindly pass the gossip between nodes. Instead, we’ll use a slightly different method. Each second, every node will contact its peers to send them the current trending topics in the node. Each time the trending topics change, a version number is incremented. In addition, the node also send its peer the node ids and versions of the messages it got from other nodes. The peer, in reply, will send a confirmation about all the node ids and versions that it has. So the origin node can fill in about any new information that it go, or ask to get updates for information that it doesn’t have.

This reduce the number of updates that flow throughout the cluster, while still maintain an eventually consistent model. We’ll be able to tell, from each node, what are the current trending topics globally.

Published at

Originally posted at

Gossip much? Operating with partial information, and getting the right result.

Unlike the previous two posts, this is going to be short. Primarily because what I wanted to talk about it what impresses me most with both HyParView and Plumtree. The really nice thing about them is that they are pretty simple, easy to understand and produce good results.

But the fun part, and what make it impressive is that they manage to achieve that with a small set of simple rules, and without any attempt to create a global view. They operate just fine with potentially very small set of the data overall, but still manage to operate, self optimize and get to the correct result. In fact, I did some very minor attempts to play with this at large scale, and we see a pretty amazing emergent behavior. Without anyone knowing what is going on globally, we are able to get to the optimal number of interactions in the cluster to distribute information.

That is really pretty cool.

And because this post is too short, I’ll leave you with a question. Given that you have this kind of infrastructure, what would you do with it? What sort of information or operations would you try to send using this way?

Published at

Originally posted at

Gossip much? The gossip epidemic and other issues in polite society

In my previous post, I talked about the Hybrid Partial View protocol, and showed a visualization about how it actually works. Something that is important to note about this protocol, it is mostly meant to create a gossip topology that is resilient to failure. It is not meant to actually send messages, it is meant to serve as the backbone topology (the peer sampling service) for figuring out what are the nodes.

The reason for that can be seen in the following 10 node cluster (after running heartbeat enough times to get to a stable state:

image

Let us assume that we want to disseminate a message across the cluster. We select node A as our root, and then send a message. The rules are as follow:

  • Each node send the message to all its active connections (except the sender, of course).
  • A node that got the same message twice will ignore the message.

Based on those rules, and the topology above, we’re going to have the following chain of messages:

  • F – initial broadcast
  • F -> E, G, J
  • E -> G, I
  • G -> E, H
  • J -> I, H, J
  • H -> C, I
  • C -> B, A
  • B -> D, A
  • A -> B, D
  • D -> A

The total number of messages passed is 20. Which is twice as much as the optimal solution would generate.

What is worse, this is a very small network, and as the network grows, so will the number of redundant messages. This approach (called eager gossiping) has a major advantage, because it will traverse all paths in the graph, it will also traverse all the shortest paths. That means that the time to get a message from the origin to all nodes is the smallest, but the number of operations is high.

The Plumtree paper (Epidemic Broadcast Trees) presents a solution to this problem. It tries to minimize the number of messages while still maintaining both reliability and optimizing the number of messages that are passed as well as the distance they have to pass.

The way Plumtree works is explained in quite beautiful detail in the paper, but the underlying idea goes like this, we start using the same approach as the eager gossiping, but whenever we get a message that we already got, we will reply to the source and tell it to stop sending us further messages. This is done so the next time that a message will be sent, we can skip the known duplicate path, and reduce the number of overall messages that we have.

So the first run is going to generate 20 messages on the network. The second is going to generate just 13, you can see the non traversed paths in the following image:

image

Note that we didn’t pass any messages between I and J, or D and A. But a lot of the saving was achieved by avoiding duplicate notifications. So node I notified node H, but not vice versa. The next time we’ll run this, we have exactly 10 messages passing:

image

Now, obviously this is pretty cool, but that is under a stable state. What happens when they are failures? Well, at that point, the notion of lazy vs. eager peers come into play. One of the things we did initially was to clear the duplicate paths in the network, so we can optimize the number of messages being passed. That is pretty cool, but it also leave us vulnerable to failures. For example, imagine that nod H is down. What happens then?

There are two aspects of this that are interesting. Plumtrees only care about the notion of message passing. They don’t deal with topology changes. In this case, the responsibility to join the different parts of the network lies with the peer sampling service, which is HyParView in this case. That would figure out the communication issue, and forge new connections with the remaining nodes. Plumtree will get notified about that, and the process continue.

But let us leave that one aside, let us say that we have a static topology, how would Plumtree handle this? Well, at this point you have to realize that Plumtree doesn’t just drop a connection when a node tell it that it already heard about a message. It just move it to a lazy state. Periodically, a node will contact other nodes which told it that it wasn’t needed and tell them: “Hi, I got messages with ids (43,41,81), do you have them?”. In this way, a node whose contact point went down would become aware that there are missing messages. At that point, it start a timer, and if it didn’t hear about those missing messages, it will ask the node that told it about those messages to send them over, and initiate an active link. The end result here is that we send additional messages, but those tend to be pretty small, just the message ids.

During steady state, we’ll just ignore those messages, but if there is a failure, they can help us recover from errors by letting us know that there are messages that we are missing, and taking action to recover that.

There is also another important aspect of this behavior, detecting and circumventing slow nodes. If a node is slow to distribute messages to its peers, other nodes will notify those peers that those messages exists, and if that is the case, we’ll eventually move to a more efficient topology by routing around that slow node.

You can see a full visualization of that (and admire my rapidly improving UI skills) here. The JavaScript implementation of the algorithm is here.

Plumtree has a few weaknesses, mostly it is that it is optimized for a single source topology. In other words, the first node you start from will influence the optimization of the network, and if you start a broadcast from another node, it won’t be an optimal operation. That said, there are a few ways to handle that. The actual topology remains the same, what influence Plumtree is the rejection replies from nodes that say that the information it transmitted was already received. We can keep track on not only the nodes that rejected us, but the root source of that rejection, so a message originating in E wouldn’t stop us from propagating a message originating in J.

Because Plumtree is meant for very large clusters (the paper talks about testing this with 10,000 nodes), and you might have a message originate from any one of those, you probably want to limit the notion of “origin”, if you track the past three nodes it passed through, you get a reasonably small amount of data that you have to keep, and it is likely to be accurate enough to build multiple topologies that will optimize themselves based on actual conditions.

That is it for this post, I’ve got a couple more posts that I want to write about gossips, but that would be it for today.

Published at

Originally posted at

Gossip much? Disseminating information among high number (10K) of nodes

Every once in a while, I like to sit down and read about what is going on outside my current immediate field of interest. This weekend, I chose to focus on efficient information dissemination with very large number of nodes.

The articles of interests for this weekend are HyParView and Epidemic Broadcast Trees (Plumtrees). There are a great read, and complement one another to a nice degree. HyParView is an algorithm that seeks to connect a set (of potentially very large number) of nodes together without trying to make each node connect to each other node. To simplify things, I’m going to talk about clusters of several dozens nodes, the articles have both been tested to the 10,000 nodes and with failure rates of up to 95% of the network. This post is here so I can work out the details in my mind, it may be that I’m wrong, so don’t try to treat this as a definitive source.

Let us assume that we have a network with 15 nodes in it. And we want to add a new node. One way of doing that would be to maintain a list of all the nodes in the system (that are what admins are for, after all) and have the node connect to all the other nodes. In this way, we can communicate between all the nodes very easily. Of course, that means that the number of connections we have in a network of 16(15+ new) nodes is 120. And that utterly ignore the notion of failure. But let us continue with this path, to see what unhappy landscape it is going to land us on.

We have a 15 node cluster, and we add a new node (so we have to give it all the other nodes), and it connects to all the other nodes and register with them. So far, so good. Now, let us say that there is a state change that we want to send to all the nodes in the network. We can do that by connecting to a single node, and having it distribute this information to all the other nodes. Cost of this would be 16 (1 to talk to the first node, then 15 for it to talk to the rest). That is very efficient, and it is easy to prove that this is indeed the most optimal way to disseminate information over the network (each node is only contacted once).

In a 16 node network, maybe that is even feasible. It is a small cluster, after all. But that is a big maybe, and I wouldn’t recommend it. If we grow the cluster size to a 100 node cluster, that gives us about 4,950(!) connections between all nodes, and the cost of sending a single piece of information is still the optimal N. But I think that this is easy to see that this isn’t the way to go about it. Mostly because you can’t do that, not even for the 16 node cluster. Usually when we talk about clusters we like to think about them as flat topologies, but that isn’t actually how it goes. Let us look at a better approximation of a real topology:

image

Yes, this isn’t quite right, but it is good enough for our purposes.

In this 16 node cluster, we have the green node, which is the one we initially contact to send some data to the entire cluster. What would happen if we tried to talk from that node to all the other nodes? Well, notice how much load it would place on the green’s node router. Or the general cost for the network in the area of the green node. Because of that, just straight on direct connection for the entire cluster is no something that you really want to do.

An alternative to do that, assuming that you have a fixed topology is to create a static tree structure, so you start with the green node, it then contacts three other nodes, who then each contact four other nodes. We still have the nice property so that each node is only getting the new information once. But we can parallelize the communication and reduce the load on a single segment of the network.

Which is great, if we have a static topology and zero failures. In practice, none of those is true, so we want something else, and hopefully something that would make this a lot easier to handle. This is where HyParView comes into play. I sat down and wrote a whole big description of how HyParView works, but it wasn’t something that you couldn’t get from the article. And one of the things that I did along the way was create a small implementation in JavaScript and plug this into a graph visualization, so I could follow what is going on there.

 

That means that I had to implement the HyParView protocol in JavaScript, but it turned out to be a great way to actually explore how the thing works, and it ended up with great visualization.

You can see it in action in this url, and you can read the actual ocde for the HyParView protocol here.

Here is the cluster at 5 nodes, just after we added E:

image

And here it is at 9 nodes, after it had a chance to be stable.

image

Note that we have the connections (active view) from each node to a up to 3 other nodes, but we also have other letters next to the node name, in []. That is the passive list, the list of nodes that we are not connected to, but will try if our connection to the one of the active list goes down.

In addition to just adding themselves to one of the nodes, the nodes will also attempt to learn the topology of the network in such a way that if there is a failure, they can recover from it. The JavaScript code I wrote is not a good JavaScript code, that isn’t my forte, but it should be enough to follow what is going on there. We are able to do very little work to have a self organizing system of nodes that discover the network.

Note that in large networks, none of the nodes would have the full picture of the entire network, but each node will have a partial view of it, and that is enough to send a message through the entire network. But I’m going to talk about this in another post.

In the meantime, go to this url and see it in action, (the actual ocde for the HyParView protocol here). Note that I've made the different action explicit, so you need to do heartbeats (and the algorithm relies on them for healing failures) to get proper behavior for the system. I've also created a predictable RNG, so we can always follow the same path in our iterations.

End of year discount for all our products

To celebrate the new year, we offer a 21% discount for all our products. This is available for the first 33 customers that use the coupon code: 0x21-celebrate-new-year

In previous years, we offered a similar number of uses for the coupon code, and they run out fast, so hurry up. This offer is valid for:

Happy Holidays and a great new years.

On a personal note, this marks the full release of all our product lines, and it took an incredible amount of work. I'm very pleased that we have been able to get the new version out there and in your hands, and to have you start making use of the features that we have been working on for so long.

Published at

Originally posted at

Tail/Feather–Snapshots

The Raft protocol gives us a stable replicated distributed log. In other words, all servers in the cluster will agree on all the committed entries to the log (both what they are, and in what position). We usually fill the logs in operations that a state machine will execute.

In the Tail/Feather example, the commands are set/del operations on the key value store. Note that this doesn’t mean that all servers will always have the same state. It is possible that a server (or set of servers) will have an outdated view of the log, but the log that they have will match up to the point that they have it.

So, what is the problem? What happens when we have an active system? Well, every time that we make a modification, we’ll add it to the log. That is all good and great, but what about the actual log? Well, it is going to stay there, we need it so we can catch up any new server that will join the cluster. But that means that over time, we are going to have an unbounded growth. Which isn’t a very nice thing to have.

Rachis handle this by asking the state machine to implement snapshots. A way to take the current state of the state machine and transmit it over the network. For example, assume that we have an entry full of these logs:

{ Op: "Add", Key: "users/1/login-attempts", "Value": 1}
{ Op: "Add", Key: "users/1/login-attempts", "Value": 2}
{ Op: "Add", Key: "users/1/login-attempts", "Value": 3}

// ...

{ Op: "Add", Key: "users/1/login-attempts", "Value": 300000}

The log for that is 300,000 entries long, but the current state of the machine:

{ "users/1/login-attempts": 300000 }

Which is obviously much smaller. Rachis doesn’t force a state machine to implement this, but if it isn’t doing so, we can never clear the log. But implementing snapshots has its own problems.

What about the actual cost of creating the snapshot? Imagine that we ask the state machine for a snapshot every 10,000 entries. In the example above, that would mean just writing out { "users/1/login-attempts": 300000 } or whatever the actual current value is.

{ Op: "Add", Key: "users/1/login-attempts", "Value": 1}
{ Op: "Add", Key: "users/2/login-attempts", "Value": 1}
{ Op: "Add", Key: "users/3/login-attempts", "Value": 1}

// ...

{ Op: "Add", Key: "users/300000/login-attempts", "Value": 1}

Note that instead of having 300,000 changes to the same key, we are going to have 300,000 keys. In this case, writing the full list down on every snapshot is very expensive. That is what incremental backups are here to solve.  We let Voron know that this is what we want by specifying:

options.IncrementalBackupEnabled = true;

And now it is time to define policies about taking snapshots. We are going to handle this using Voron full & incremental snapshots. You can see the logic in the following code.

public void CreateSnapshot(long index, long term, ManualResetEventSlim allowFurtherModifications)
{
    // we have not snapshot files, so this is the first time that we create a snapshot
    // we handle that by asking voron to create a full backup
    var files = Directory.GetFiles(_storageEnvironment.Options.BasePath, "*.Snapshot");
    Array.Sort(files, StringComparer.OrdinalIgnoreCase); // make sure we get it in sort order
    if (files.Any() == false)
    {
        DoFullBackup(index, term, allowFurtherModifications);
        return;
    }
    string lastFullBackup = null;
    int fullBackupIndex = -1;
    for (int i = files.Length - 1; i >= 0; i--)
    {
        if (!files[i].StartsWith("Full")) 
            continue;
        fullBackupIndex = i;
        lastFullBackup = files[i];
        break;
    }
            
    if (lastFullBackup == null)
    {
        // this shouldn't be the case, we must always have at least one full backup. 
        // maybe user deleted it? We'll do a full backup here to compensate
        DoFullBackup(index, term, allowFurtherModifications);
        return;
    }
            
    var fullBackupSize = new FileInfo(lastFullBackup).Length;
    var incrementalBackupsSize = files.Skip(fullBackupIndex + 1).Sum(f => new FileInfo(f).Length);

    // now we need to decide whatever to do a full or incremental backup, doing incremental backups stop 
    // making sense if they will take more space than the full backup. Our cutoff point is when it passes to 50%
    // size of the full backup.
    // If full backup size is 1 GB, and we have 25 incrmeental backups that are 600 MB in size, we need to transfer
    // 1.6 GB to restore. If we generate a new full backup, we'll only need to transfer 1 GB to restore.

    if (incrementalBackupsSize / 2 > fullBackupSize)
    {
        DoFullBackup(index, term, allowFurtherModifications);
        return;
    }

    DeleteOldSnapshots(files.Take(fullBackupIndex - 1));// delete snapshots older than the current full backup

    var incrementalBackup = new IncrementalBackup();
    incrementalBackup.ToFile(_storageEnvironment,
        Path.Combine(_storageEnvironment.Options.BasePath, string.Format("Inc-{0:D19}-{1:D19}.Snapshot", index, term)),
        infoNotify: Console.WriteLine,
        backupStarted: allowFurtherModifications.Set);
}

private void DoFullBackup(long index, long term, ManualResetEventSlim allowFurtherModifications)
{
    var snapshotsToDelete = Directory.GetFiles(_storageEnvironment.Options.BasePath, "*.Snapshot");

    var fullBackup = new FullBackup();
    fullBackup.ToFile(_storageEnvironment,
        Path.Combine(_storageEnvironment.Options.BasePath, string.Format("Full-{0:D19}-{1:D19}.Snapshot", index, term)),
        infoNotify: Console.WriteLine,
        backupStarted: allowFurtherModifications.Set
        );

    DeleteOldSnapshots(snapshotsToDelete);
}

private static void DeleteOldSnapshots(IEnumerable<string> snapshotsToDelete)
{
    foreach (var snapshot in snapshotsToDelete)
    {
        try
        {
            File.Delete(snapshot);
        }
        catch (Exception)
        {
            // we ignore snapshots we can't delete, they are expected if we are concurrently writing
            // the snapshot and creating a new one. We'll get them the next time.
        }
    }
}

Basically, we need to strike a balance between full and incremental backups. We do that by first taking a full backup, and then starting to take incremental backups until our incremental backups takes more than 50% of the full backup, at which point we are probably better off doing another full backup. Note that we use the event of a full backup to clear the old incremental and full backup files.

And with that, we can move to actually sending the snapshot over the wire. This is exposed by the GetSnapshotWriter() method. This just shell all the responsibility to the SnapshotWriter:

public ISnapshotWriter GetSnapshotWriter()
{
    return new SnapshotWriter(this);
}

public class SnapshotWriter : ISnapshotWriter
{
    private readonly KeyValueStateMachine _parent;

    private List<FileStream> _files = new List<FileStream>();

    public SnapshotWriter(KeyValueStateMachine parent)
    {
        _parent = parent;
        var files = Directory.GetFiles(_parent._storageEnvironment.Options.BasePath, "*.Snapshot");
        var fullBackupIndex = GetFullBackupIndex(files);

        if (fullBackupIndex == -1)
            throw new InvalidOperationException("Could not find a full backup file to start the snapshot writing");

        var last = Path.GetFileNameWithoutExtension(files[files.Length-1]);
        Debug.Assert(last != null);
        var parts = last.Split('-');
        if(parts.Length != 3)
            throw new InvalidOperationException("Invalid snapshot file name " + files[files.Length - 1] + ", could not figure out index & term");

        Index = long.Parse(parts[1]);
        Term = long.Parse(parts[2]);

        for (int i = fullBackupIndex; i < files.Length; i++)
        {
            _files.Add(File.OpenRead(files[i]));
        }
    }

    public void Dispose()
    {
        foreach (var file in _files)
        {
            file.Dispose();
        }
    }

    public long Index { get; private set; }
    public long Term { get; private set; }
    public void WriteSnapshot(Stream stream)
    {
        var writer = new BinaryWriter(stream);
        writer.Write(_files.Count);
        foreach (var file in _files)
        {
            writer.Write(file.Name);
writer.Write(file.Length); writer.Flush(); file.CopyTo(stream); } } }

What is going on here? We get the snapshot files, and find the latest full backup, then we open all the files that we’ll need for the snapshot (the last full backup and everything afterward). We need to open them in the constructor to lock them for deletion by the CreateSnapshot() method.

Then we just concatenate them all and send them over the wire. And getting them? That is pretty easy as well:

public void ApplySnapshot(long term, long index, Stream stream)
{
    var basePath = _storageEnvironment.Options.BasePath;
    _storageEnvironment.Dispose();

    foreach (var file in Directory.EnumerateFiles(basePath))
    {
        File.Delete(file);
    }

    var files = new List<string>();

    var buffer = new byte[1024*16];
    var reader = new BinaryReader(stream);
    var filesCount = reader.ReadInt32();
    if (filesCount == 0)
        throw new InvalidOperationException("Snapshot cannot contain zero files");
    for (int i = 0; i < filesCount; i++)
    {
        var name = reader.ReadString();
        files.Add(name);
        var len = reader.ReadInt64();
        using (var file = File.Create(Path.Combine(basePath, name)))
        {
            file.SetLength(len);
            var totalFileRead = 0;
            while (totalFileRead < len)
            {
                var read = stream.Read(buffer, 0, (int) Math.Min(buffer.Length, len - totalFileRead));
                if (read == 0)
                    throw new EndOfStreamException();
                totalFileRead += read;
                file.Write(buffer, 0, read);
            }
        }
    }
            
    new FullBackup().Restore(Path.Combine(basePath, files[0]), basePath);

    var options = StorageEnvironmentOptions.ForPath(basePath);
    options.IncrementalBackupEnabled = true;
    //TODO: Copy any other customizations that might have happened on the options

    new IncrementalBackup().Restore(options, files.Skip(1));

    _storageEnvironment = new StorageEnvironment(options);

    using (var tx = _storageEnvironment.NewTransaction(TransactionFlags.ReadWrite))
    {
        var metadata = tx.ReadTree("$metadata");
        metadata.Add("last-index", EndianBitConverter.Little.GetBytes(index));
        LastAppliedIndex = index;
        tx.Commit();
    }
}

Unpack the snapshots from the stream, then first apply a full backup, then all the incremental backups. Make sure to update the last applied index, and we are set Smile.

Published at

Originally posted at

Tail/Feather–The client API

As I mentioned Tail/Feather is a weekend project to test out how stuff works for real. After creating the highly available distributed key/value store, we are now in need of actually building a client API for it.

Externally, that API is going to look like this:

public class TailFeatherClient : IDisposable
{
public TailFeatherClient(params Uri[] nodes);

public Task Set(string key, JToken value);

public Task<JToken> Get(string key);

public Task Remove(string key);

public void Dispose();
}

If this wasn’t a weekend project, I would add batch support, but that isn’t important for our purposes right now. The API itself is pretty stupid, which is great, but what about the actual behavior?

We want it to be able to handle dynamic cluster changes, and we need it to be smart about it. A lot of that is shared among all operations, so the next layer of the API is:

public Task Set(string key, JToken value)
{
return ContactServer(client => client.GetAsync(string.Format("tailfeather/key-val/set?key={0}&val={1}",
Uri.EscapeDataString(key), Uri.EscapeDataString(value.ToString(Formatting.None)))));
}

public async Task<JToken> Get(string key)
{
var reply = await ContactServer(client => client.GetAsync(string.Format("tailfeather/key-val/del?key={0}",
Uri.EscapeDataString(key))));
var result = JObject.Load(new JsonTextReader(new StreamReader(await reply.Content.ReadAsStreamAsync())));

if (result.Value<bool>("Missing"))
return null;

return result["Value"];
}

public Task Remove(string key)
{
return ContactServer(client => client.GetAsync(string.Format("tailfeather/key-val/del?key={0}",
Uri.EscapeDataString(key))));
}

The actual behavior is in ContactServer:

private readonly ConcurrentDictionary<Uri, HttpClient> _cache = new ConcurrentDictionary<Uri, HttpClient>();
private Task<TailFeatherTopology> _topologyTask;

public TailFeatherClient(params Uri[] nodes)
{
_topologyTask = FindLatestTopology(nodes);
}

private HttpClient GetHttpClient(Uri node)
{
return _cache.GetOrAdd(node, uri => new HttpClient { BaseAddress = uri });
}

private async Task<TailFeatherTopology> FindLatestTopology(IEnumerable<Uri> nodes)
{
var tasks = nodes.Select(node => GetHttpClient(node).GetAsync("tailfeather/admin/flock")).ToArray();

await Task.WhenAny(tasks);
var topologies = new List<TailFeatherTopology>();
foreach (var task in tasks)
{
var message = task.Result;
if (message.IsSuccessStatusCode == false)
continue;

topologies.Add(new JsonSerializer().Deserialize<TailFeatherTopology>(
new JsonTextReader(new StreamReader(await message.Content.ReadAsStreamAsync()))));
}

return topologies.OrderByDescending(x => x.CommitIndex).FirstOrDefault();
}

private async Task<HttpResponseMessage> ContactServer(Func<HttpClient, Task<HttpResponseMessage>> operation, int retries = 3)
{
if (retries < 0)
throw new InvalidOperationException("Cluster is not reachable, or no leader was selected. Out of retries, aborting.");

var topology = (await _topologyTask ?? new TailFeatherTopology());

var leader = topology.AllVotingNodes.FirstOrDefault(x => x.Name == topology.CurrentLeader);
if (leader == null)
{
_topologyTask = FindLatestTopology(topology.AllVotingNodes.Select(x => x.Uri));
return await ContactServer(operation, retries - 1);
}

// now we have a leader, we need to try calling it...
var httpResponseMessage = await operation(GetHttpClient(leader.Uri));
if (httpResponseMessage.IsSuccessStatusCode == false)
{
// we were sent to a different server, let try that...
if (httpResponseMessage.StatusCode == HttpStatusCode.Redirect)
{
var redirectUri = httpResponseMessage.Headers.Location;
httpResponseMessage = await operation(GetHttpClient(redirectUri));
if (httpResponseMessage.IsSuccessStatusCode)
{
// we successfully contacted the redirected server, this is probably the leader, let us ask it for the topology,
// it will be there for next time we access it
_topologyTask = FindLatestTopology(new[] { redirectUri }.Union(topology.AllVotingNodes.Select(x => x.Uri)));

return httpResponseMessage;
}
}

// we couldn't get to the server, and we didn't get redirected, we'll check in the cluster in general
_topologyTask = FindLatestTopology(topology.AllVotingNodes.Select(x => x.Uri));
return await ContactServer(operation, retries - 1);
}

// happy path, we are done
return httpResponseMessage;
}

There is quite a bit going on here. But the basic idea is simple. Starting from the initial list of nodes we have, contact all of them and find the topology with the highest commit index. That means that it is the freshest, so more likely to be the current one. From the topology, we take the leader, and send all queries to the leader.

If there is any sort of errors, we’ll contact all other servers to find who we are supposed to be using now. If we can’t find it after three tries, we give us and we let the caller sort it out, probably by retrying once the cluster is in a steady state again.

Now, this is really nice, but it is falling into the heading of weekend code. That is means that this is quite far from what I would call production code. What is missing?

  • Caching the topology locally in a persistent manner so we can restart when the known servers are down from last known good topology.
  • Proper error handling, and in particular, error reporting, to make sure that we can understand what is actually is going on.
  • Features such as allowing reads from non leaders, testing, etc.

But overall, I’m quite happy with this.

Tail/Feather–highly available distributed key/value store weekend project

Weekend project means just that, I’m trying some things out, and writing something real is the best way to exercise. This isn’t going to be a full blown project, but it should be functional and usable.

The basic idea, I’m going to build a distributed key/value configuration store. Similar to etcd, this will allow me to explore how to handle full blown Rachis from both server & client sides.

We want this to be a full  blown implementation, which means persistence, snapshots, network api, the works.

In terms of the data model, we’ll go for the simplest possible one. A key/value store. A key is a string of up to 128 characters. A value is a json formatted value of up to 16Kb. Persistence will be handled by Voron. The persistent of the project is mostly Voron, so what we are left with is the following:

public enum KeyValueOperationTypes
{
Add,
Del
}

public class KeyValueOperation
{
public KeyValueOperationTypes Type;
public string Key;
public JToken Value;
}

public class OperationBatchCommand : Command
{
public KeyValueOperation[] Batch { get; set; }
}

This gives us the background for the actual state machine:

public class KeyValueStateMachine : IRaftStateMachine
{
readonly StorageEnvironment _storageEnvironment;

public KeyValueStateMachine(StorageEnvironmentOptions options)
{
_storageEnvironment = new StorageEnvironment(options);
using (var tx = _storageEnvironment.NewTransaction(TransactionFlags.ReadWrite))
{
_storageEnvironment.CreateTree(tx, "items");
var metadata = _storageEnvironment.CreateTree(tx, "$metadata");
var readResult = metadata.Read("last-index");
if (readResult != null)
LastAppliedIndex = readResult.Reader.ReadLittleEndianInt64();
tx.Commit();
}
}

public event EventHandler<KeyValueOperation> OperatonExecuted;

protected void OnOperatonExecuted(KeyValueOperation e)
{
var handler = OperatonExecuted;
if (handler != null) handler(this, e);
}

public JToken Read(string key)
{
using (var tx = _storageEnvironment.NewTransaction(TransactionFlags.Read))
{
var items = tx.ReadTree("items");

var readResult = items.Read(key);

if (readResult == null)
return null;


return JToken.ReadFrom(new JsonTextReader(new StreamReader(readResult.Reader.AsStream())));
}
}

public long LastAppliedIndex { get; private set; }

public void Apply(LogEntry entry, Command cmd)
{
var batch = (OperationBatchCommand)cmd;
Apply(batch.Batch, cmd.AssignedIndex);
}


private void Apply(IEnumerable<KeyValueOperation> ops, long commandIndex)
{
using (var tx = _storageEnvironment.NewTransaction(TransactionFlags.ReadWrite))
{
var items = tx.ReadTree("items");
var metadata = tx.ReadTree("$metadata");
metadata.Add("last-index", EndianBitConverter.Little.GetBytes(commandIndex));
var ms = new MemoryStream();
foreach (var op in ops)
{
switch (op.Type)
{
case KeyValueOperationTypes.Add:
ms.SetLength(0);

var streamWriter = new StreamWriter(ms);
op.Value.WriteTo(new JsonTextWriter(streamWriter));
streamWriter.Flush();

ms.Position = 0;
items.Add(op.Key, ms);
break;
case KeyValueOperationTypes.Del:
items.Delete(op.Key);
break;
default:
throw new ArgumentOutOfRangeException();
}
OnOperatonExecuted(op);
}

tx.Commit();
}
}


public void Dispose()
{
if (_storageEnvironment != null)
_storageEnvironment.Dispose();
}
}

As you can see, there isn’t much here. Not surprising, since we are storing a key/value data structure. I’m also ignoring snapshots for now. That is good enough for now, let us go for the network portion of the work. We are going to be using Web API for the network stuff. And we’ll be initializing it like so:

var nodeName = options.NodeName ?? (Environment.MachineName + ":" + options.Port);

var kvso = StorageEnvironmentOptions.ForPath(Path.Combine(options.DataPath, "KeyValue"));
using (var statemachine = new KeyValueStateMachine(kvso))
{
using (var raftEngine = new RaftEngine(new RaftEngineOptions(
new NodeConnectionInfo
{
Name = nodeName,
Url = new Uri("http://" + Environment.MachineName + ":" + options.Port),
},
StorageEnvironmentOptions.ForPath(Path.Combine(options.DataPath, "Raft")),
new HttpTransport(nodeName),
statemachine
)))
{
using (WebApp.Start(new StartOptions
{
Urls = { "http://+:" + options.Port + "/" }
}, builder =>
{
var httpConfiguration = new HttpConfiguration();
RaftWebApiConfig.Register(httpConfiguration);
httpConfiguration.Properties[typeof(HttpTransportBus)] = new HttpTransportBus(nodeName);
httpConfiguration.Properties[typeof(RaftEngine)] = raftEngine;
builder.UseWebApi(httpConfiguration);
}))
{

Console.WriteLine("Ready & processing requests, press ENTER to sop");

Console.ReadLine();
}
}
}

Note that we need to initialize both the state machine and the raft engine, then wire the raft engine controllers. Now we are pretty much done with setup, and we can turn to the actual semantics of running the cluster. The first thing that I want to do is to setup the baseline, so we create this base controller:

public abstract class TailFeatherController : ApiController
{
public KeyValueStateMachine StateMachine { get; private set; }
public RaftEngine RaftEngine { get; private set; }

public override async Task<HttpResponseMessage> ExecuteAsync(HttpControllerContext controllerContext, CancellationToken cancellationToken)
{
RaftEngine = (RaftEngine)controllerContext.Configuration.Properties[typeof(RaftEngine)];
StateMachine = (KeyValueStateMachine)RaftEngine.StateMachine;
try
{
return await base.ExecuteAsync(controllerContext, cancellationToken);
}
catch (NotLeadingException)
{
var currentLeader = RaftEngine.CurrentLeader;
if (currentLeader == null)
{
return new HttpResponseMessage(HttpStatusCode.PreconditionFailed)
{
Content = new StringContent("{ 'Error': 'No current leader, try again later' }")
};
}
var leaderNode = RaftEngine.CurrentTopology.GetNodeByName(currentLeader);
if (leaderNode == null)
{
return new HttpResponseMessage(HttpStatusCode.InternalServerError)
{
Content = new StringContent("{ 'Error': 'Current leader " + currentLeader + " is not found in the topology. This should not happen.' }")
};
}
return new HttpResponseMessage(HttpStatusCode.Redirect)
{
Headers =
{
Location = leaderNode.Uri
}
};
}
}
}

That is a lot of error handling, but basically it just get the right values from the configuration and expose them to the controller actions, then a lot of error handling when we have a command that requires a leader that hit a follower.

Next step, actually managing the cluster, here we go:

public class AdminController : TailFeatherController
{
[HttpGet]
[Route("tailfeather/admin/fly-with-us")]
public async Task<HttpResponseMessage> Join([FromUri] string url, [FromUri] string name)
{
var uri = new Uri(url);
name = name ?? uri.Host + (uri.IsDefaultPort ? "" : ":" + uri.Port);

await RaftEngine.AddToClusterAsync(new NodeConnectionInfo
{
Name = name,
Uri = uri
});
return new HttpResponseMessage(HttpStatusCode.Accepted);
}

[HttpGet]
[Route("tailfeather/admin/fly-away")]
public async Task<HttpResponseMessage> Leave([FromUri] string name)
{
await RaftEngine.RemoveFromClusterAsync(new NodeConnectionInfo
{
Name = name
});
return new HttpResponseMessage(HttpStatusCode.Accepted);
}
}

So now we have a way to add and remove items from the cluster, which is all the admin stuff that we need to handle right now. Next, we need to actually wire the operations, this is done here:

public class KeyValueController : TailFeatherController
{
[HttpGet]
[Route("tailfeather/key-val/read")]
public HttpResponseMessage Read([FromUri] string key)
{
var read = StateMachine.Read(key);
if (read == null)
{
return Request.CreateResponse(HttpStatusCode.NotFound, new
{
RaftEngine.State,
Key = key,
Missing = true
});
}
return Request.CreateResponse(HttpStatusCode.OK, new
{
RaftEngine.State,
Key = key,
Value = read
});
}

[HttpGet]
[Route("tailfeather/key-val/set")]
public Task<HttpResponseMessage> Set([FromUri] string key, [FromUri] string val)
{
JToken jVal;
try
{
jVal = JToken.Parse(val);
}
catch (JsonReaderException)
{
jVal = val;
}

var op = new KeyValueOperation
{
Key = key,
Type = KeyValueOperationTypes.Add,
Value = jVal
};

return Batch(new[] { op });
}

[HttpGet]
[Route("tailfeather/key-val/del")]
public Task<HttpResponseMessage> Del([FromUri] string key)
{
var op = new KeyValueOperation
{
Key = key,
Type = KeyValueOperationTypes.Del,
};

return Batch(new[] { op });
}

[HttpPost]
[Route("tailfeather/key-val/batch")]
public async Task<HttpResponseMessage> Batch()
{
var stream = await Request.Content.ReadAsStreamAsync();
var operations = new JsonSerializer().Deserialize<KeyValueOperation[]>(new JsonTextReader(new StreamReader(stream)));

return await Batch(operations);
}

private async Task<HttpResponseMessage> Batch(KeyValueOperation[] operations)
{
var taskCompletionSource = new TaskCompletionSource<object>();
RaftEngine.AppendCommand(new OperationBatchCommand
{
Batch = operations,
Completion = taskCompletionSource
});
await taskCompletionSource.Task;

return Request.CreateResponse(HttpStatusCode.Accepted);
}
}

And we are pretty much set.

Note that I’ve been writing this post while I’m writing the code, so I’ve made some small changes, you can see actual code here.

Anyway, we are pretty much done. Now we can compile and try testing what is going on.

First, we seed the cluster, but running:

.\TailFeather.exe --port=9079 --DataPath=One --Name=One –Bootstrap

This tell us that this node is allowed to become a leader without having to pre-configure a cluster. This command runs and exit, so now we’ll run three such copies:

  • start .\TailFeather.exe "--port=9079 --DataPath=One --Name=One"
  • start .\TailFeather.exe "--port=9078 --DataPath=Two --Name=Two"
  • start .\TailFeather.exe "--port=9077 --DataPath=Three --Name=Three"

We have all three nodes up and running, so now is the time to actually make use of it:

http://localhost:9079/tailfeather/key-val/set?key=ravendb&val={ 'Url': 'http://live-test.ravendb.net', 'Database': 'Sample' }

In this case, you can see that we are setting a configuration value to point to a RavenDB database on the first node. Note that at this point, we have a single node cluster, and the two other are waiting to join it, but are taking no action.

We can get the value back using:

image

So far, so good. Now, let us add a second node in by inviting it to fly with our cluster. We do that using the following command:

http://localhost:9079/tailfeather/admin/fly-with-us?url=http://localhost:9078&name=Two

Which will give us:

image

Note that we are using the just added node for too look at this.

Next, we can add the third node.

http://localhost:9079/tailfeather/admin/fly-with-us?url=http://localhost:9077&name=Three

I would put the image in, but I think you get the point.

This is it for now. We have a highly available persistent & distributed key/value store. Next, we need to tackle the idea of snapshots and the client API, but I’ll deal with that at another post.

Published at

Originally posted at

Introducing Rachis: Raven’s Raft implementation

Rachis, def: Spinal column, also the distal part of the shaft of a feather that bears the web.

Rachis is the name we picked for RavenDB’s Raft implementation. Raft is a consensus protocol that can actually be understood without resorting to Greek philosophy. You can read all about it in here (there is a very cool interactive visualization there). I would also like to thank Diego Ongaro for both the Raft paper and a lot of help while I tried to understand the finer points of it.

Why Raft?

Raft is a distributed consensus protocol. It allows you to reach an order set of operations across your entire cluster. This means that you can apply a set of operations on a state machine, and have the same final state machine in all nodes in the cluster. It is also drastically simpler to understand than Paxos, which is the more known alternative.

What is Rachis?

Well, it is a Raft implementation. To be rather more exact, it is a Raft implementation with the following features:

  • (Obviously) the ability to manage a distributed set of state machine and reliability commits updates to said state machines.
  • Dynamic topology (nodes can join and leave the cluster on the fly, including state sync).
  • Large state machines (snapshots, efficient transfers, non voting members).
  • ACID local log using Voron.
  • Support for in memory and persistent state machines.
  • Support for voting & non voting members.
  • A lot of small tweaks for best behavior in strange situations (forced step down, leader timeout and many more).

What are you going to use this for?

To reach a consensus, of course Smile. More to the point, we got some really nice idea where this is going to allow us to do some really nice stuff. In particular, we want to use that as the backbone for the event and time series replication models.

But I’m getting ahead of myself. Before we do that, I want to build a simple reliable distributed service. We’ll call it Tail/Feather and it will be awesome, in a weekend project kind of way. I’ll post full details about this in my next post.

Where can I look at it?

The current version is here, note that you’ll need to pull Voron as well (from the ravendb repository) to get it working.

How does this work?

You can read the Raft paper and the full thesis, of course, but there are some subtleties that I had to work through (with great help from Diego), so it is worth going into a bit more detail.

Clusters are typically composed of odd number of servers (3,5 or 7), which can communicate freely with one another. The startup process for a cluster require us to designate a single server as the seed. This is the only server that can become the cluster leader during the cluster bootstrap process. This is done to make sure that during startup, before we had the chance to tell the servers about the cluster topology, they won’t consider themselves a single node cluster and start accepting requests before we add them to the cluster.

Only the seed server will become the leader, and all the others will wait for instructions. We can then let the seed server know about the other nodes in the cluster. It will initiate a join operation which will reach to the other node, setup the appropriate cluster topology. At that point, all the other servers are on equal footing, and there is no longer any meaningful distinction between them. The notion of a seed node it only  relevant for cluster bootstrap, once that is done, all servers have the same configuration, and there isn’t any difference between them.

Dynamically adding and removing nodes from the cluster

Removing a node from the cluster is a simple process. All we need to do is to update the cluster topology, and we are done. The removed server will get a notification to let it know that is has been disconnected from the cluster, and will move itself to a passive state (note that it is okay if it doesn’t get this notification, we are just being nice about it Smile).

The process of adding a server is a bit more complex. Not only are we having to add a new node, we need to make sure that it has the same state as all other nodes in the cluster. In order to do that, we handle it in multiple stages. A node added to the cluster can be in one of three states: Voting (full member of the cluster, able to become a leader), Non Voting (just listening to what is going on, can’t be a leader), Promotable (getting up to speed with the cluster state). Non voting members are a unique case, they are there to enable some advance scenarios (cross data center communication, as we currently envision it).

Promotable is a lot more interesting. Adding a node to an existing cluster can be a long process, especially if we are managing a lot of data. In order to handle that, we adding a server to the promotable category, in which case we are starting to send it the state it needs to catch up with the rest of the cluster. Once it has caught up with the cluster (it has all the committed entries in the cluster), we will automatically move it to the voting members in the cluster.

Note that it is fine for crashes to happen throughout this process. The cluster leader can crash during this, and we’ll recover and handle this properly.

Normal operations

During normal operations, there is going to be a leader that is going to be accepting all the requests for the cluster, and handle committing them cluster wide. During those operations, you can spread reads across members in the cluster, for better performance.

Now, if you don’t mind, I’m going to be writing Tail/Feather now, and see how long it takes.

NHibernate & Entity Framework Profiler 3.0

It may not get enough attention, but we have been working on the profilers as well during the past few months.

TLDR; You can get the next generation of NHibernate Profiler and Entity Framework Profiler now, lots of goodies to look at!

I’m sure that a lot of people would be thrilled to hear that we dropped Silverlight in favor of going back to WPF UI. The idea was that we would be able to deploy anywhere, including in production. But Silverlight just made things harder all around, and customers didn’t like the production profiling mode.

Production Profiling

We have changed how we profile in production. You can now make the following call in your code:

NHibernateProfiler.InitializeForProduction(port, password);

And then connect to your production system:

image

At which point you can profile what is going on in your production system safely and easily. The traffic between your production server and the profiler is SSL encrypted.

NHibernate 4.x and Entity Framework vNext support

The profilers now support the latest version of NHibernate and Entity Framework. That include profiling async operations, better suitability for modern apps, and more.

New SQL Paging Syntax

We are now properly support SQL Server paging syntax:

select * from Users
order by Name
offset 0 /* @p0 */ rows fetch next 250 /* @p1 */ rows only

This is great for NHibernate users, who finally can have a sane paging syntax as well as beautiful queries in the profiler.

At a glance view

A lot of the time, you don’t want the profiler to be front and center, you want to just run it and have it there to glance at once in a while. The new compact view gives you just that:

image

You can park it at some point in your screen and work normally, glancing to see if it found anything. This is much less distracting than the full profiler for normal operations.

Scopes and groups

When we started working on the profilers, we followed the “one session per request” rule, and that was pretty good. But a lot of people, especially in the Entity Framework group are using multiple sessions or data contexts in a single request, but they still want to see the ability to see the operations in a request at a glance. We are now allowing you to group things, like this:

image

By default, we use the current request to group things, but we also give you the ability to define your own scopes. So if you are profiling NServiceBus application, you can set the scope as your message handling by setting ProfilerIntegration.GetCurrentScopeName or explicitly calling ProfilerIntegration.StartScope whenever you want.

Customized profiling

You can now surface troublesome issues directly from your code. If you have an issue with a query, you can mark it for attention using CustomQueryReporting .ReportError() that would flag it in the UI for further investigation.

You can also just mark interesting pieces in the UI without an error, like so:

using (var db = new Entities(conStr))
{
    var post1 = db.Posts.FirstOrDefault();

    using (ProfilerIntegration.StarStatements("Blue"))
    {
        var post2 = db.Posts.FirstOrDefault();
    }

    var post3 = db.Posts.FirstOrDefault();
    
    ProfilerIntegration.StarStatements();
    var post4 = db.Posts.FirstOrDefault();
    ProfilerIntegration.StarStatementsClear();

    var post5 = db.Posts.FirstOrDefault();
}

Which will result in:

image

Disabling profiler from configuration

You can now disable the profiler by setting:

<add key="HibernatingRhinos.Profiler.Appender.NHibernate" value="Disabled" />

This will avoid initializing the profiler, obviously. The intent is that you can setup production profiling, disable it by default, and enable it selectively if you need to actually figure things out.

Odds & ends

We move to WebActivatorEx  from the deprecated WebActivator, added xml file for the appender, fixed a whole bunch of small bugs, the most important among them is:

clip_image001[4]

 

Linq to SQL, Hibernate and LLBLGen Profilers, RIP

You might have noticed that I talked only about NHibernate and Entity Framework Profilers. The sales for the rests weren’t what we hoped they would be, and we are no longer going to sale them.

Go get them, there is a new release discount

You can get the NHibernate Profiler and Entity Framework Profiler for a 15% discount for the next two weeks.

The process of performance problem fixes with RavenDB

This post isn’t so much about this particular problem, but about the way we solved this.

We have a number of ways to track performance problems, but this is a good example, we can see that for some reason, this test has failed because it took too long to run:

image

In order to handle that, I don’t want to run the test, I don’t actually care that much about this. So I wanted to be able to run this independently.

To do that, I added:

image

This opens us the studio with all the data that we have for this test. Which is great, since this means that we can export the data.

image

That done, we can import it to an instance that we control, and start testing the performance.  In particular, we can run in under a profiler, to see what it is doing.

The underlying reason ended up being an issue with how we flush things to disk, which was easily fixed once we could narrow it down. The problem was just getting it working in a reproducible manner. This approach, being able to just stop midway through a test and capture the full state of the system is invaluable in troubleshooting what is going on.

Tags:

Published at

Originally posted at

Comments (7)

Transactions are a figment of your imagination

This post is in response for a few comments here. In particular, I get the sense that people expect businesses and systems to behave as if transactions are a real thing. The only problem here is that this is quite far from the truth.

Let me define what I mean by a transaction here. I’m not talking about database transactions, ACID or any such thing. I’m talking about the notion that any interaction between a user and a business, or between two businesses can actually be modeled with the notion of a transaction similar to what we see in databases.

That is, that we have an interaction that would either be all there, or won’t be there at all. The most obvious example is the notion of financial transaction, the notion that we debit an account and then we credit another account. And we have to do that in such a way that either both accounts were modified or none of them were modified. That is the classic example for database transactions, and it is wrong. As anyone who ever wrote a check or sent an wire money transfer can tell. A good discussion on how that actually works can be found here. Note that in this case, the way money transfer works, in the real world, is that you upload a file over FTP, then wait three to five days to see if the orders your sent were rejected.

Another example is the notion of accepting an order, in a transactional manner. If I accepted your order, after verifying that I have reserved everything, and my warehouse burned down, what do I do with your order? I can hardly roll it back.

To move away from businesses, let us consider doing something as unimportant as voting in a national election. Logically speaking, this is a pretty simple process. Identify the voter, record their vote, total the votes, select winner. Except that you can go back and force a re-election in a particular district if such is needed, or you might find a box of lost votes, or any of a hundred evil little things that crop up in the real world.

Any attempt to model such systems in neat transactional boxes with “all in or none at all” is going to break.

Lies, Service Level Agreements, Trust and failure mores

I had a very interesting discussion with Kelly Sommers in twitter. But 140 characters isn’t really enough to explain things. Also, it is interesting topic in general.

Kelly disagreed with this post: http://www.shopify.ca/technology/14909841-kafka-producer-pipeline-for-ruby-on-rails

image

You can read the full discussion here.

The basic premise is, there is a highly reliable distributed queue that is used to process messages, but because they didn’t have operational experience with this, they used a local queue to store the messages sending them over the network. Kelly seems to think that this is decreasing reliability. I disagree.

The underlying premise is simple, when do you consider it acceptable to lose a message. If returning an error to the client is fine, sure, go ahead and do that if you can’t reach the cluster. But if you are currently processing a 10 million dollar order, that is going to kinda suck, and anything that you can do to reduce the chance of that happening is good. Note that key part in this statement, we can only reduce the chance of this happening, we can’t ensure it.

One way to try that is to get a guaranteed SLA from the distributed queue. Once we have that, we can rely that it works. This seems to be what Kelly is aiming at:

image

And that is true, if you could rely on SLAs. Just this week we had a multi hour, multi region Azure outage. In fact, outages, include outages that violate SLAs are unfortunately common.

In fact, if we look at recent history, we have:

There are actually more of them, but I think that 5 outages in 2 years is enough to show a pattern.

And note specifically that I’m talking about global outages like the ones above. Most people don’t realize that complex systems operate in a constant mode of partial failure. If you ever read an accident investigative report, you’ll know that there is almost never just a single cause of failure. For example, the road was slippery and the driver was speeding and the ABS system failed and the road barrier foundation rotted since being installed. Even a single change in one of those would mitigate the accident from a fatal crash to didn’t happen to a “honey, we need a new car”.

You can try to rely on the distribute queue in this case, because it has an SLA. And Toyota also promises that your car won’t suddenly accelerate into a wall, but if you had a Toyota Camry in 2010… well, you know…

From my point of view, saving the data locally before sending over the network makes a lot of sense. In general, the local state of the machine is much more stable than than the network. And if there is an internal failure in the machine, it is usually too hosed to do anything about anyway. I might try to write to disk, and write to the network even if I can’t do that ,because I want to do my utmost to not lose the message.

Now, let us consider the possible failure scenarios. I’m starting all of them with the notion that I just got a message for a 10 million dollars order, and I need to send it to the backend for processing.

  1. We can’t communicate with the distributed queue. That can be because it is down, hopefully that isn’t the case, but from our point of view, if our node became split from the network, this has the same effect. We are writing this down to disk, so when we become available again, we’ll be able to forward the stored message to the distributed queue.
  2. We can’t communicate with the disk, maybe it is full, or there is an error, or something like that .We can still talk to the network, so we place it in the distributed queue, and we go on with our lives.
  3. We can’t communicate with the disk, we can’t communicate with the network. We can’t keep it in memory (we might overflow the memory), and anyway, if we are out of disk and network, we are probably going to be rebooted soon anyway. SOL, there is nothing else we can do at this point.

Note that the first case assumes that we actually do come back up. If the admin just blew this node away, then the data on that node isn’t coming back, obviously. But since the admin knows that we are storing things locally, s/he will at least try to restore the data from that machine.

We are safer (not safe, but more safe than without it). The question is whatever this is worth it? If your messages aren’t actually carrying financial information, you can probably just drop a few messages as long as you let the end user know about that, so they can retry. If you really care about each individual message, if it is important enough to go the extra few miles for it, then the store and forward model gives you a measurable level of extra safety.

The bug that ruined my weekend

This is bloody strange. I have a test failing in our unit tests, which isn’t an all too uncommon occurrence after a big work. The only problem is that this test shouldn’t fail, no one touched this part.

For reference, here is the commit where this is failing. You can reproduce this by running the Raven.Tryouts console project.

Note that it has to be done in Release mode. When that happens, we consistently get the following error:

Unhandled Exception: System.NullReferenceException: Object reference not set to an instance of an object.
at Raven.Client.Connection.MultiGetOperation.<TryResolveConflictOrCreateConcurrencyException>d__b.MoveNext() in c:\Work\ravendb\Raven.Client.Lightweight\Connection\MultiGetOperation.cs:line 156
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)

Here is the problem with this stack trace:

image

Now, this only happens in release mode, but it happens there consistently. Now, I’ve verified that this isn’t an issue of me running old version of the code. So this isn’t possible. There is no concurrency going on, all the data this method is touching is only touched by this thread.

What is more, the exception is not thrown from inside the foreach loop in line 139. I’ve verified that by putting a try catch around the inside of the loop and still getting the NRE when thrown outside it. In fact, I have tried to figure it out in pretty much any way I can. Attaching a debugger make the problem disappear, as does disabling optimization, or anything like that.

In fact, at this point I’m going to throw up my hands in disgust, because this is not something that I can figure out. Here is why, this is my “fix” for this issue. I replaced the following failing code:

image

With the following passing code:

image

And yes, this should make zero difference to the actual behavior, but it does. I’m suspecting a JIT issue.

Tags:

Published at

Originally posted at

Comments (21)

Solve this bug!

This took some head scratching before I figured it out.

image

And here is the relevant code:

image

Tags:

Published at

Originally posted at

Comments (10)

Beyond RavenDB 3.0: The future road map for RavenDB

We are pretty much done with RavenDB 3.0, we are waiting for fixes to internal apps we use to process orders and support customers, and then we can actually make a release. In the meantime, that means that we need to start looking beyond the 3.0 release. We had a few people internally focus on post 3.0 work for the past few months, and we have a rough outline for what we done there. Primarily we are talking about better distribution and storage models.

Storage models – the polyglot database

Under this umbrella we put dedicated database engines to support specific needs. We are talking about distributed counters (high scale out, rapid throughput), time series and event store as the primary areas that we are focused on. For example, the counters stuff is pretty much complete, but we didn’t have time to actually make that into a fully mature product.

I talked about this several times in the past, so I’ll not get into too many details here.

Distribution models

We have been working on a Raft implementation for the past few months, and it is now in the stage where we are starting to integrate it into the rest of our software. Raft is planned to be the core replication protocol for the time series and events databases. But you are probably going to see if first as topology super layer for RavenDB and RavenFS.

Distributed topology management

Replication support in RavenDB and RavenFS follow the multi master system. You can write to any node, and your write will be distributed by the server to all the nodes. This has several advantages, in particular, the fact that we can operate in disconnected or partially disconnected manner, and that we need little coordination between clients to get everything working. It also has the disadvantage of allow conflicts. In fact, if you are writing to multiple replicating nodes, and aren’t careful about how you are splitting writes, you are pretty much guaranteed to have conflicts. We have repeatedly heard that this is both a good thing and something that customers really don’t want to deal with.

It is a good thing because we don’t have data loss, it is a bad thing because if you aren’t ready to handle this, some of your data is inaccessible because of the conflict until it is resolved.

Because of that, we are considering implementing a server side topology management system. The actual replication mechanics are going to remain the same. But the difference is how we are going to decide how to work with it.

A cluster (in this case, a set of RavenDB servers and all databases and file systems on them)  is composed of cooperating nodes. The cluster is managed via Raft, which is used to store the topology information of the cluster. Topology include each of the nodes in the system, as well as any of the databases and file systems on the cluster. The cluster will select a leader, and that leader will also be the primary node for writes for all databases. In other words, assume we have a 3 node cluster, and 5 databases in the cluster. All the databases are replicated to all three nodes, and a single node is going to serve as the write primary for all operations.

During normal operations, clients will query any server for the replication topology (and cache that) every 5 minutes or so. If a node is down, we’ll switch over to an alternative node. If the leader is down, we’ll query all other nodes to try to find out who the new leader is, then continue using that leader’s topology from now on. This give us the advantage that a down server cause clients to switch over and stay switched. That avoid an operational hazard when you bring a down node back up again.

Clients will include the topology version they have in all communication with the server. If the topology version doesn’t match, the server will return an error, and the client will query all nodes it knows about to find the current topology version. It will always chose the latest topology version, and continue from there.

Note that there are still a chance for conflicts, a leader may become disconnected from the network, but not be aware of that, and accept writes to the database. Another node will take over as the cluster leader and clients will start writing to it. There is a gap where a conflict can occur, but it is pretty small one, and we have good mechanisms to deal with conflicts, anyway.

We are also thinking about exposing a system similar to the topology for clients directly. Basically, a small distributed and consistent key/value store. Mostly meant for configuration.

Thoughts?

Tags:

Published at

Originally posted at

Comments (7)

RavenDB Wow! Features presentation

In Oredev, beside sitting in a booth and demoing why RavenDB is cool for about one trillion times, I also gave a talk. I intended it to be a demo packed 60 minutes, but then I realized that I only have 40 minutes for the entire thing.

The only thing to do was to speed things out, I think I breathed twice throughout the entire presentation. And I think it went great.

RAVENDB: WOW! FEATURES - THE THINGS THAT YOU DIDN'T KNOW THAT YOUR DATABASE CAN DO FOR YOU from Øredev Conference on Vimeo.

Tags:

Published at

Originally posted at

Comments (4)

RavenDB 3.0 RTM!

RavenDB 3.0 is out and about!

RavenDB

It is available on out downloads page and on Nuget. You can read all about what is new with RavenDB 3.0 here.

This is a stable release, fully supported. It is the culmination of over a year and a half of work, a very large team and enough improvements to make you dance a jig.

You can play with the new version here, and all of our systems has been running on 3.0 for a while now, of course.

And with that, I’m exhausted, thrilled and very excited. Have fun playing with 3.0, and check by tomorrow to see some of the cool Wow features.

Open-mouthed smile

Tags:

Published at

Originally posted at

Comments (9)

The road to RavenDB 3.0 stable release

We are currently busy shouting at the build cluster to hurry up and finish (it is not impressed by us and keep chugging on our test suite), but I was quite amused by the following:

image

This is the merge from the 3.0 development branch to the stable branch. That is a lot of goodness coming your way…

Tags:

Published at

Originally posted at

Comments (2)

Large scale distributed consensus approaches: Concurrent consistent decisions

So far we tackled the idea of large compute cluster, and a large storage cluster. I mentioned that the problem with the large storage cluster is that it doesn’t handle consistency within itself. Two concurrent requests can hit two storage nodes and make concurrent operations that aren’t synchronized between themselves. That is usually a good thing, since that is what you want for high throughput system. The less coordination you can get away with, the more you can actually do.

So far, so good, but that isn’t always suitable. Let us consider a case where we need to have a consistent approach, for some business reason. The typical example would be transactions in a bank, but I hate this example, because in the real world banks deal with inconsistency all the time, this is an explicit part of their business model. Let us talk about auctions and bids, instead. We have an auction service, which allow us to run a large number of auctions.

For each auction, users can place bids, and it is important for us that bids are always processed sequentially per auction, because we have to know who place a bid that is immediately rejected ($1 commission) or a wining bid that was later overbid (no commission except for the actual winner). We’ll leave aside the fact that this is something that we can absolutely figure out from the auction history and say that we need to have it immediate and consistent. How do we go about doing this?

Remember, we have enough load on the system that we are running a cluster with a hundred nodes in it. The rough topology is still this:

image

We have the consensus cluster, which decide on the network topology. In other words, it decide which set of servers is responsible for which auction. What happens next is where it gets interesting.

Instead of just a set of cooperating nodes that share the data between them and each of which can accept both reads and writes, we are going to twist things a bit. Each set of servers is their own consensus cluster for that particular auction. In other words, we first go to the root consensus cluster to get the topology information, then we add another command to the auction’s log. That command go through the same distributed consensus algorithm between the three nodes. The overall cluster is composed of many consensus clusters for each auction.

This means that we have a fully consistent set of operations across the entire cluster, even in the presence of failure. Which is quite nice. The problem here is that you have to have a good way to distinguish between the different consensuses. In this case, an auction is the key per consensus, but it isn’t always so each to make such distinction, and it is important that an auction cannot grow large enough to overwhelm the set of servers that it is actually using. In those cases, you can’t really do much beyond relax the constraints and go in a different manner.

For optimization purposes, you usually don’t run an independent consensus for each of the auctions. Or rather, you do, but you make sure that you’ll share the same communication resources, so for auctions/123 the nodes are D,E,U with E being the leader, while for auctions/321 the nodes are also D,E,U but U is the leader. This gives you the ability to spread processing power among the cluster, and the communication channels (TCP connections, for example) are shared between both auctions consensuses. 

Tags:

Published at

Originally posted at

Comments (3)

RavenDB 3.0 RC discount ends in two days

I forgot the mention this explicitly, but we are currently giving 20% discount for RavenDB 3.0 licenses for the release candidate.

This discount is going to be discontinued with the release of RavenDB 3.0 in two days, so if you are counting it, hurry up Smile.

Tags:

Published at

Originally posted at

Comments (3)

RavenDB 3.0 Release date: 25 Nov, 2014

Barring anything major, we’ll be releasing RavenDB 3.0 in 5 days Smile.

It will be  a stable release and you’re encourage to move to it as soon as it is available, using the Esent database.

The Voron database is still in RC mode (mostly because we’re paranoid and want to have more real world experience before we go full forward with this), but it is going to be fully supported.

Upgrading instances will use Esent, and new databases will default to Esent unless you explicitly select Voron.

Tags:

Published at

Originally posted at

Comments (9)