Oren Eini

CEO of RavenDB

a NoSQL Open Source Document Database

Get in touch with me:

oren@ravendb.net +972 52-548-6969

Posts: 7,598
|
Comments: 51,232
Privacy Policy · Terms
filter by tags archive
time to read 7 min | 1260 words

In my previous post, I talked about the Hybrid Partial View protocol, and showed a visualization about how it actually works. Something that is important to note about this protocol, it is mostly meant to create a gossip topology that is resilient to failure. It is not meant to actually send messages, it is meant to serve as the backbone topology (the peer sampling service) for figuring out what are the nodes.

The reason for that can be seen in the following 10 node cluster (after running heartbeat enough times to get to a stable state:

image

Let us assume that we want to disseminate a message across the cluster. We select node A as our root, and then send a message. The rules are as follow:

  • Each node send the message to all its active connections (except the sender, of course).
  • A node that got the same message twice will ignore the message.

Based on those rules, and the topology above, we’re going to have the following chain of messages:

  • F – initial broadcast
  • F -> E, G, J
  • E -> G, I
  • G -> E, H
  • J -> I, H, J
  • H -> C, I
  • C -> B, A
  • B -> D, A
  • A -> B, D
  • D -> A

The total number of messages passed is 20. Which is twice as much as the optimal solution would generate.

What is worse, this is a very small network, and as the network grows, so will the number of redundant messages. This approach (called eager gossiping) has a major advantage, because it will traverse all paths in the graph, it will also traverse all the shortest paths. That means that the time to get a message from the origin to all nodes is the smallest, but the number of operations is high.

The Plumtree paper (Epidemic Broadcast Trees) presents a solution to this problem. It tries to minimize the number of messages while still maintaining both reliability and optimizing the number of messages that are passed as well as the distance they have to pass.

The way Plumtree works is explained in quite beautiful detail in the paper, but the underlying idea goes like this, we start using the same approach as the eager gossiping, but whenever we get a message that we already got, we will reply to the source and tell it to stop sending us further messages. This is done so the next time that a message will be sent, we can skip the known duplicate path, and reduce the number of overall messages that we have.

So the first run is going to generate 20 messages on the network. The second is going to generate just 13, you can see the non traversed paths in the following image:

image

Note that we didn’t pass any messages between I and J, or D and A. But a lot of the saving was achieved by avoiding duplicate notifications. So node I notified node H, but not vice versa. The next time we’ll run this, we have exactly 10 messages passing:

image

Now, obviously this is pretty cool, but that is under a stable state. What happens when they are failures? Well, at that point, the notion of lazy vs. eager peers come into play. One of the things we did initially was to clear the duplicate paths in the network, so we can optimize the number of messages being passed. That is pretty cool, but it also leave us vulnerable to failures. For example, imagine that nod H is down. What happens then?

There are two aspects of this that are interesting. Plumtrees only care about the notion of message passing. They don’t deal with topology changes. In this case, the responsibility to join the different parts of the network lies with the peer sampling service, which is HyParView in this case. That would figure out the communication issue, and forge new connections with the remaining nodes. Plumtree will get notified about that, and the process continue.

But let us leave that one aside, let us say that we have a static topology, how would Plumtree handle this? Well, at this point you have to realize that Plumtree doesn’t just drop a connection when a node tell it that it already heard about a message. It just move it to a lazy state. Periodically, a node will contact other nodes which told it that it wasn’t needed and tell them: “Hi, I got messages with ids (43,41,81), do you have them?”. In this way, a node whose contact point went down would become aware that there are missing messages. At that point, it start a timer, and if it didn’t hear about those missing messages, it will ask the node that told it about those messages to send them over, and initiate an active link. The end result here is that we send additional messages, but those tend to be pretty small, just the message ids.

During steady state, we’ll just ignore those messages, but if there is a failure, they can help us recover from errors by letting us know that there are messages that we are missing, and taking action to recover that.

There is also another important aspect of this behavior, detecting and circumventing slow nodes. If a node is slow to distribute messages to its peers, other nodes will notify those peers that those messages exists, and if that is the case, we’ll eventually move to a more efficient topology by routing around that slow node.

You can see a full visualization of that (and admire my rapidly improving UI skills) here. The JavaScript implementation of the algorithm is here.

Plumtree has a few weaknesses, mostly it is that it is optimized for a single source topology. In other words, the first node you start from will influence the optimization of the network, and if you start a broadcast from another node, it won’t be an optimal operation. That said, there are a few ways to handle that. The actual topology remains the same, what influence Plumtree is the rejection replies from nodes that say that the information it transmitted was already received. We can keep track on not only the nodes that rejected us, but the root source of that rejection, so a message originating in E wouldn’t stop us from propagating a message originating in J.

Because Plumtree is meant for very large clusters (the paper talks about testing this with 10,000 nodes), and you might have a message originate from any one of those, you probably want to limit the notion of “origin”, if you track the past three nodes it passed through, you get a reasonably small amount of data that you have to keep, and it is likely to be accurate enough to build multiple topologies that will optimize themselves based on actual conditions.

That is it for this post, I’ve got a couple more posts that I want to write about gossips, but that would be it for today.

time to read 7 min | 1229 words

Every once in a while, I like to sit down and read about what is going on outside my current immediate field of interest. This weekend, I chose to focus on efficient information dissemination with very large number of nodes.

The articles of interests for this weekend are HyParView and Epidemic Broadcast Trees (Plumtrees). There are a great read, and complement one another to a nice degree. HyParView is an algorithm that seeks to connect a set (of potentially very large number) of nodes together without trying to make each node connect to each other node. To simplify things, I’m going to talk about clusters of several dozens nodes, the articles have both been tested to the 10,000 nodes and with failure rates of up to 95% of the network. This post is here so I can work out the details in my mind, it may be that I’m wrong, so don’t try to treat this as a definitive source.

Let us assume that we have a network with 15 nodes in it. And we want to add a new node. One way of doing that would be to maintain a list of all the nodes in the system (that are what admins are for, after all) and have the node connect to all the other nodes. In this way, we can communicate between all the nodes very easily. Of course, that means that the number of connections we have in a network of 16(15+ new) nodes is 120. And that utterly ignore the notion of failure. But let us continue with this path, to see what unhappy landscape it is going to land us on.

We have a 15 node cluster, and we add a new node (so we have to give it all the other nodes), and it connects to all the other nodes and register with them. So far, so good. Now, let us say that there is a state change that we want to send to all the nodes in the network. We can do that by connecting to a single node, and having it distribute this information to all the other nodes. Cost of this would be 16 (1 to talk to the first node, then 15 for it to talk to the rest). That is very efficient, and it is easy to prove that this is indeed the most optimal way to disseminate information over the network (each node is only contacted once).

In a 16 node network, maybe that is even feasible. It is a small cluster, after all. But that is a big maybe, and I wouldn’t recommend it. If we grow the cluster size to a 100 node cluster, that gives us about 4,950(!) connections between all nodes, and the cost of sending a single piece of information is still the optimal N. But I think that this is easy to see that this isn’t the way to go about it. Mostly because you can’t do that, not even for the 16 node cluster. Usually when we talk about clusters we like to think about them as flat topologies, but that isn’t actually how it goes. Let us look at a better approximation of a real topology:

image

Yes, this isn’t quite right, but it is good enough for our purposes.

In this 16 node cluster, we have the green node, which is the one we initially contact to send some data to the entire cluster. What would happen if we tried to talk from that node to all the other nodes? Well, notice how much load it would place on the green’s node router. Or the general cost for the network in the area of the green node. Because of that, just straight on direct connection for the entire cluster is no something that you really want to do.

An alternative to do that, assuming that you have a fixed topology is to create a static tree structure, so you start with the green node, it then contacts three other nodes, who then each contact four other nodes. We still have the nice property so that each node is only getting the new information once. But we can parallelize the communication and reduce the load on a single segment of the network.

Which is great, if we have a static topology and zero failures. In practice, none of those is true, so we want something else, and hopefully something that would make this a lot easier to handle. This is where HyParView comes into play. I sat down and wrote a whole big description of how HyParView works, but it wasn’t something that you couldn’t get from the article. And one of the things that I did along the way was create a small implementation in JavaScript and plug this into a graph visualization, so I could follow what is going on there.

 

That means that I had to implement the HyParView protocol in JavaScript, but it turned out to be a great way to actually explore how the thing works, and it ended up with great visualization.

You can see it in action in this url, and you can read the actual ocde for the HyParView protocol here.

Here is the cluster at 5 nodes, just after we added E:

image

And here it is at 9 nodes, after it had a chance to be stable.

image

Note that we have the connections (active view) from each node to a up to 3 other nodes, but we also have other letters next to the node name, in []. That is the passive list, the list of nodes that we are not connected to, but will try if our connection to the one of the active list goes down.

In addition to just adding themselves to one of the nodes, the nodes will also attempt to learn the topology of the network in such a way that if there is a failure, they can recover from it. The JavaScript code I wrote is not a good JavaScript code, that isn’t my forte, but it should be enough to follow what is going on there. We are able to do very little work to have a self organizing system of nodes that discover the network.

Note that in large networks, none of the nodes would have the full picture of the entire network, but each node will have a partial view of it, and that is enough to send a message through the entire network. But I’m going to talk about this in another post.

In the meantime, go to this url and see it in action, (the actual ocde for the HyParView protocol here). Note that I've made the different action explicit, so you need to do heartbeats (and the algorithm relies on them for healing failures) to get proper behavior for the system. I've also created a predictable RNG, so we can always follow the same path in our iterations.

time to read 1 min | 125 words

To celebrate the new year, we offer a 21% discount for all our products. This is available for the first 33 customers that use the coupon code: 0x21-celebrate-new-year

In previous years, we offered a similar number of uses for the coupon code, and they run out fast, so hurry up. This offer is valid for:

Happy Holidays and a great new years.

On a personal note, this marks the full release of all our product lines, and it took an incredible amount of work. I'm very pleased that we have been able to get the new version out there and in your hands, and to have you start making use of the features that we have been working on for so long.

time to read 15 min | 2839 words

The Raft protocol gives us a stable replicated distributed log. In other words, all servers in the cluster will agree on all the committed entries to the log (both what they are, and in what position). We usually fill the logs in operations that a state machine will execute.

In the Tail/Feather example, the commands are set/del operations on the key value store. Note that this doesn’t mean that all servers will always have the same state. It is possible that a server (or set of servers) will have an outdated view of the log, but the log that they have will match up to the point that they have it.

So, what is the problem? What happens when we have an active system? Well, every time that we make a modification, we’ll add it to the log. That is all good and great, but what about the actual log? Well, it is going to stay there, we need it so we can catch up any new server that will join the cluster. But that means that over time, we are going to have an unbounded growth. Which isn’t a very nice thing to have.

Rachis handle this by asking the state machine to implement snapshots. A way to take the current state of the state machine and transmit it over the network. For example, assume that we have an entry full of these logs:

{ Op: "Add", Key: "users/1/login-attempts", "Value": 1}
{ Op: "Add", Key: "users/1/login-attempts", "Value": 2}
{ Op: "Add", Key: "users/1/login-attempts", "Value": 3}

// ...

{ Op: "Add", Key: "users/1/login-attempts", "Value": 300000}

The log for that is 300,000 entries long, but the current state of the machine:

{ "users/1/login-attempts": 300000 }

Which is obviously much smaller. Rachis doesn’t force a state machine to implement this, but if it isn’t doing so, we can never clear the log. But implementing snapshots has its own problems.

What about the actual cost of creating the snapshot? Imagine that we ask the state machine for a snapshot every 10,000 entries. In the example above, that would mean just writing out { "users/1/login-attempts": 300000 } or whatever the actual current value is.

{ Op: "Add", Key: "users/1/login-attempts", "Value": 1}
{ Op: "Add", Key: "users/2/login-attempts", "Value": 1}
{ Op: "Add", Key: "users/3/login-attempts", "Value": 1}

// ...

{ Op: "Add", Key: "users/300000/login-attempts", "Value": 1}

Note that instead of having 300,000 changes to the same key, we are going to have 300,000 keys. In this case, writing the full list down on every snapshot is very expensive. That is what incremental backups are here to solve.  We let Voron know that this is what we want by specifying:

options.IncrementalBackupEnabled = true;

And now it is time to define policies about taking snapshots. We are going to handle this using Voron full & incremental snapshots. You can see the logic in the following code.

public void CreateSnapshot(long index, long term, ManualResetEventSlim allowFurtherModifications)
{
    // we have not snapshot files, so this is the first time that we create a snapshot
    // we handle that by asking voron to create a full backup
    var files = Directory.GetFiles(_storageEnvironment.Options.BasePath, "*.Snapshot");
    Array.Sort(files, StringComparer.OrdinalIgnoreCase); // make sure we get it in sort order
    if (files.Any() == false)
    {
        DoFullBackup(index, term, allowFurtherModifications);
        return;
    }
    string lastFullBackup = null;
    int fullBackupIndex = -1;
    for (int i = files.Length - 1; i >= 0; i--)
    {
        if (!files[i].StartsWith("Full")) 
            continue;
        fullBackupIndex = i;
        lastFullBackup = files[i];
        break;
    }
            
    if (lastFullBackup == null)
    {
        // this shouldn't be the case, we must always have at least one full backup. 
        // maybe user deleted it? We'll do a full backup here to compensate
        DoFullBackup(index, term, allowFurtherModifications);
        return;
    }
            
    var fullBackupSize = new FileInfo(lastFullBackup).Length;
    var incrementalBackupsSize = files.Skip(fullBackupIndex + 1).Sum(f => new FileInfo(f).Length);

    // now we need to decide whatever to do a full or incremental backup, doing incremental backups stop 
    // making sense if they will take more space than the full backup. Our cutoff point is when it passes to 50%
    // size of the full backup.
    // If full backup size is 1 GB, and we have 25 incrmeental backups that are 600 MB in size, we need to transfer
    // 1.6 GB to restore. If we generate a new full backup, we'll only need to transfer 1 GB to restore.

    if (incrementalBackupsSize / 2 > fullBackupSize)
    {
        DoFullBackup(index, term, allowFurtherModifications);
        return;
    }

    DeleteOldSnapshots(files.Take(fullBackupIndex - 1));// delete snapshots older than the current full backup

    var incrementalBackup = new IncrementalBackup();
    incrementalBackup.ToFile(_storageEnvironment,
        Path.Combine(_storageEnvironment.Options.BasePath, string.Format("Inc-{0:D19}-{1:D19}.Snapshot", index, term)),
        infoNotify: Console.WriteLine,
        backupStarted: allowFurtherModifications.Set);
}

private void DoFullBackup(long index, long term, ManualResetEventSlim allowFurtherModifications)
{
    var snapshotsToDelete = Directory.GetFiles(_storageEnvironment.Options.BasePath, "*.Snapshot");

    var fullBackup = new FullBackup();
    fullBackup.ToFile(_storageEnvironment,
        Path.Combine(_storageEnvironment.Options.BasePath, string.Format("Full-{0:D19}-{1:D19}.Snapshot", index, term)),
        infoNotify: Console.WriteLine,
        backupStarted: allowFurtherModifications.Set
        );

    DeleteOldSnapshots(snapshotsToDelete);
}

private static void DeleteOldSnapshots(IEnumerable<string> snapshotsToDelete)
{
    foreach (var snapshot in snapshotsToDelete)
    {
        try
        {
            File.Delete(snapshot);
        }
        catch (Exception)
        {
            // we ignore snapshots we can't delete, they are expected if we are concurrently writing
            // the snapshot and creating a new one. We'll get them the next time.
        }
    }
}

Basically, we need to strike a balance between full and incremental backups. We do that by first taking a full backup, and then starting to take incremental backups until our incremental backups takes more than 50% of the full backup, at which point we are probably better off doing another full backup. Note that we use the event of a full backup to clear the old incremental and full backup files.

And with that, we can move to actually sending the snapshot over the wire. This is exposed by the GetSnapshotWriter() method. This just shell all the responsibility to the SnapshotWriter:

public ISnapshotWriter GetSnapshotWriter()
{
    return new SnapshotWriter(this);
}

public class SnapshotWriter : ISnapshotWriter
{
    private readonly KeyValueStateMachine _parent;

    private List<FileStream> _files = new List<FileStream>();

    public SnapshotWriter(KeyValueStateMachine parent)
    {
        _parent = parent;
        var files = Directory.GetFiles(_parent._storageEnvironment.Options.BasePath, "*.Snapshot");
        var fullBackupIndex = GetFullBackupIndex(files);

        if (fullBackupIndex == -1)
            throw new InvalidOperationException("Could not find a full backup file to start the snapshot writing");

        var last = Path.GetFileNameWithoutExtension(files[files.Length-1]);
        Debug.Assert(last != null);
        var parts = last.Split('-');
        if(parts.Length != 3)
            throw new InvalidOperationException("Invalid snapshot file name " + files[files.Length - 1] + ", could not figure out index & term");

        Index = long.Parse(parts[1]);
        Term = long.Parse(parts[2]);

        for (int i = fullBackupIndex; i < files.Length; i++)
        {
            _files.Add(File.OpenRead(files[i]));
        }
    }

    public void Dispose()
    {
        foreach (var file in _files)
        {
            file.Dispose();
        }
    }

    public long Index { get; private set; }
    public long Term { get; private set; }
    public void WriteSnapshot(Stream stream)
    {
        var writer = new BinaryWriter(stream);
        writer.Write(_files.Count);
        foreach (var file in _files)
        {
            writer.Write(file.Name);
writer.Write(file.Length); writer.Flush(); file.CopyTo(stream); } } }

What is going on here? We get the snapshot files, and find the latest full backup, then we open all the files that we’ll need for the snapshot (the last full backup and everything afterward). We need to open them in the constructor to lock them for deletion by the CreateSnapshot() method.

Then we just concatenate them all and send them over the wire. And getting them? That is pretty easy as well:

public void ApplySnapshot(long term, long index, Stream stream)
{
    var basePath = _storageEnvironment.Options.BasePath;
    _storageEnvironment.Dispose();

    foreach (var file in Directory.EnumerateFiles(basePath))
    {
        File.Delete(file);
    }

    var files = new List<string>();

    var buffer = new byte[1024*16];
    var reader = new BinaryReader(stream);
    var filesCount = reader.ReadInt32();
    if (filesCount == 0)
        throw new InvalidOperationException("Snapshot cannot contain zero files");
    for (int i = 0; i < filesCount; i++)
    {
        var name = reader.ReadString();
        files.Add(name);
        var len = reader.ReadInt64();
        using (var file = File.Create(Path.Combine(basePath, name)))
        {
            file.SetLength(len);
            var totalFileRead = 0;
            while (totalFileRead < len)
            {
                var read = stream.Read(buffer, 0, (int) Math.Min(buffer.Length, len - totalFileRead));
                if (read == 0)
                    throw new EndOfStreamException();
                totalFileRead += read;
                file.Write(buffer, 0, read);
            }
        }
    }
            
    new FullBackup().Restore(Path.Combine(basePath, files[0]), basePath);

    var options = StorageEnvironmentOptions.ForPath(basePath);
    options.IncrementalBackupEnabled = true;
    //TODO: Copy any other customizations that might have happened on the options

    new IncrementalBackup().Restore(options, files.Skip(1));

    _storageEnvironment = new StorageEnvironment(options);

    using (var tx = _storageEnvironment.NewTransaction(TransactionFlags.ReadWrite))
    {
        var metadata = tx.ReadTree("$metadata");
        metadata.Add("last-index", EndianBitConverter.Little.GetBytes(index));
        LastAppliedIndex = index;
        tx.Commit();
    }
}

Unpack the snapshots from the stream, then first apply a full backup, then all the incremental backups. Make sure to update the last applied index, and we are set Smile.

time to read 7 min | 1376 words

As I mentioned Tail/Feather is a weekend project to test out how stuff works for real. After creating the highly available distributed key/value store, we are now in need of actually building a client API for it.

Externally, that API is going to look like this:

public class TailFeatherClient : IDisposable
{
public TailFeatherClient(params Uri[] nodes);

public Task Set(string key, JToken value);

public Task<JToken> Get(string key);

public Task Remove(string key);

public void Dispose();
}

If this wasn’t a weekend project, I would add batch support, but that isn’t important for our purposes right now. The API itself is pretty stupid, which is great, but what about the actual behavior?

We want it to be able to handle dynamic cluster changes, and we need it to be smart about it. A lot of that is shared among all operations, so the next layer of the API is:

public Task Set(string key, JToken value)
{
return ContactServer(client => client.GetAsync(string.Format("tailfeather/key-val/set?key={0}&val={1}",
Uri.EscapeDataString(key), Uri.EscapeDataString(value.ToString(Formatting.None)))));
}

public async Task<JToken> Get(string key)
{
var reply = await ContactServer(client => client.GetAsync(string.Format("tailfeather/key-val/del?key={0}",
Uri.EscapeDataString(key))));
var result = JObject.Load(new JsonTextReader(new StreamReader(await reply.Content.ReadAsStreamAsync())));

if (result.Value<bool>("Missing"))
return null;

return result["Value"];
}

public Task Remove(string key)
{
return ContactServer(client => client.GetAsync(string.Format("tailfeather/key-val/del?key={0}",
Uri.EscapeDataString(key))));
}

The actual behavior is in ContactServer:

private readonly ConcurrentDictionary<Uri, HttpClient> _cache = new ConcurrentDictionary<Uri, HttpClient>();
private Task<TailFeatherTopology> _topologyTask;

public TailFeatherClient(params Uri[] nodes)
{
_topologyTask = FindLatestTopology(nodes);
}

private HttpClient GetHttpClient(Uri node)
{
return _cache.GetOrAdd(node, uri => new HttpClient { BaseAddress = uri });
}

private async Task<TailFeatherTopology> FindLatestTopology(IEnumerable<Uri> nodes)
{
var tasks = nodes.Select(node => GetHttpClient(node).GetAsync("tailfeather/admin/flock")).ToArray();

await Task.WhenAny(tasks);
var topologies = new List<TailFeatherTopology>();
foreach (var task in tasks)
{
var message = task.Result;
if (message.IsSuccessStatusCode == false)
continue;

topologies.Add(new JsonSerializer().Deserialize<TailFeatherTopology>(
new JsonTextReader(new StreamReader(await message.Content.ReadAsStreamAsync()))));
}

return topologies.OrderByDescending(x => x.CommitIndex).FirstOrDefault();
}

private async Task<HttpResponseMessage> ContactServer(Func<HttpClient, Task<HttpResponseMessage>> operation, int retries = 3)
{
if (retries < 0)
throw new InvalidOperationException("Cluster is not reachable, or no leader was selected. Out of retries, aborting.");

var topology = (await _topologyTask ?? new TailFeatherTopology());

var leader = topology.AllVotingNodes.FirstOrDefault(x => x.Name == topology.CurrentLeader);
if (leader == null)
{
_topologyTask = FindLatestTopology(topology.AllVotingNodes.Select(x => x.Uri));
return await ContactServer(operation, retries - 1);
}

// now we have a leader, we need to try calling it...
var httpResponseMessage = await operation(GetHttpClient(leader.Uri));
if (httpResponseMessage.IsSuccessStatusCode == false)
{
// we were sent to a different server, let try that...
if (httpResponseMessage.StatusCode == HttpStatusCode.Redirect)
{
var redirectUri = httpResponseMessage.Headers.Location;
httpResponseMessage = await operation(GetHttpClient(redirectUri));
if (httpResponseMessage.IsSuccessStatusCode)
{
// we successfully contacted the redirected server, this is probably the leader, let us ask it for the topology,
// it will be there for next time we access it
_topologyTask = FindLatestTopology(new[] { redirectUri }.Union(topology.AllVotingNodes.Select(x => x.Uri)));

return httpResponseMessage;
}
}

// we couldn't get to the server, and we didn't get redirected, we'll check in the cluster in general
_topologyTask = FindLatestTopology(topology.AllVotingNodes.Select(x => x.Uri));
return await ContactServer(operation, retries - 1);
}

// happy path, we are done
return httpResponseMessage;
}

There is quite a bit going on here. But the basic idea is simple. Starting from the initial list of nodes we have, contact all of them and find the topology with the highest commit index. That means that it is the freshest, so more likely to be the current one. From the topology, we take the leader, and send all queries to the leader.

If there is any sort of errors, we’ll contact all other servers to find who we are supposed to be using now. If we can’t find it after three tries, we give us and we let the caller sort it out, probably by retrying once the cluster is in a steady state again.

Now, this is really nice, but it is falling into the heading of weekend code. That is means that this is quite far from what I would call production code. What is missing?

  • Caching the topology locally in a persistent manner so we can restart when the known servers are down from last known good topology.
  • Proper error handling, and in particular, error reporting, to make sure that we can understand what is actually is going on.
  • Features such as allowing reads from non leaders, testing, etc.

But overall, I’m quite happy with this.

time to read 20 min | 3999 words

Weekend project means just that, I’m trying some things out, and writing something real is the best way to exercise. This isn’t going to be a full blown project, but it should be functional and usable.

The basic idea, I’m going to build a distributed key/value configuration store. Similar to etcd, this will allow me to explore how to handle full blown Rachis from both server & client sides.

We want this to be a full  blown implementation, which means persistence, snapshots, network api, the works.

In terms of the data model, we’ll go for the simplest possible one. A key/value store. A key is a string of up to 128 characters. A value is a json formatted value of up to 16Kb. Persistence will be handled by Voron. The persistent of the project is mostly Voron, so what we are left with is the following:

public enum KeyValueOperationTypes
{
Add,
Del
}

public class KeyValueOperation
{
public KeyValueOperationTypes Type;
public string Key;
public JToken Value;
}

public class OperationBatchCommand : Command
{
public KeyValueOperation[] Batch { get; set; }
}

This gives us the background for the actual state machine:

public class KeyValueStateMachine : IRaftStateMachine
{
readonly StorageEnvironment _storageEnvironment;

public KeyValueStateMachine(StorageEnvironmentOptions options)
{
_storageEnvironment = new StorageEnvironment(options);
using (var tx = _storageEnvironment.NewTransaction(TransactionFlags.ReadWrite))
{
_storageEnvironment.CreateTree(tx, "items");
var metadata = _storageEnvironment.CreateTree(tx, "$metadata");
var readResult = metadata.Read("last-index");
if (readResult != null)
LastAppliedIndex = readResult.Reader.ReadLittleEndianInt64();
tx.Commit();
}
}

public event EventHandler<KeyValueOperation> OperatonExecuted;

protected void OnOperatonExecuted(KeyValueOperation e)
{
var handler = OperatonExecuted;
if (handler != null) handler(this, e);
}

public JToken Read(string key)
{
using (var tx = _storageEnvironment.NewTransaction(TransactionFlags.Read))
{
var items = tx.ReadTree("items");

var readResult = items.Read(key);

if (readResult == null)
return null;


return JToken.ReadFrom(new JsonTextReader(new StreamReader(readResult.Reader.AsStream())));
}
}

public long LastAppliedIndex { get; private set; }

public void Apply(LogEntry entry, Command cmd)
{
var batch = (OperationBatchCommand)cmd;
Apply(batch.Batch, cmd.AssignedIndex);
}


private void Apply(IEnumerable<KeyValueOperation> ops, long commandIndex)
{
using (var tx = _storageEnvironment.NewTransaction(TransactionFlags.ReadWrite))
{
var items = tx.ReadTree("items");
var metadata = tx.ReadTree("$metadata");
metadata.Add("last-index", EndianBitConverter.Little.GetBytes(commandIndex));
var ms = new MemoryStream();
foreach (var op in ops)
{
switch (op.Type)
{
case KeyValueOperationTypes.Add:
ms.SetLength(0);

var streamWriter = new StreamWriter(ms);
op.Value.WriteTo(new JsonTextWriter(streamWriter));
streamWriter.Flush();

ms.Position = 0;
items.Add(op.Key, ms);
break;
case KeyValueOperationTypes.Del:
items.Delete(op.Key);
break;
default:
throw new ArgumentOutOfRangeException();
}
OnOperatonExecuted(op);
}

tx.Commit();
}
}


public void Dispose()
{
if (_storageEnvironment != null)
_storageEnvironment.Dispose();
}
}

As you can see, there isn’t much here. Not surprising, since we are storing a key/value data structure. I’m also ignoring snapshots for now. That is good enough for now, let us go for the network portion of the work. We are going to be using Web API for the network stuff. And we’ll be initializing it like so:

var nodeName = options.NodeName ?? (Environment.MachineName + ":" + options.Port);

var kvso = StorageEnvironmentOptions.ForPath(Path.Combine(options.DataPath, "KeyValue"));
using (var statemachine = new KeyValueStateMachine(kvso))
{
using (var raftEngine = new RaftEngine(new RaftEngineOptions(
new NodeConnectionInfo
{
Name = nodeName,
Url = new Uri("http://" + Environment.MachineName + ":" + options.Port),
},
StorageEnvironmentOptions.ForPath(Path.Combine(options.DataPath, "Raft")),
new HttpTransport(nodeName),
statemachine
)))
{
using (WebApp.Start(new StartOptions
{
Urls = { "http://+:" + options.Port + "/" }
}, builder =>
{
var httpConfiguration = new HttpConfiguration();
RaftWebApiConfig.Register(httpConfiguration);
httpConfiguration.Properties[typeof(HttpTransportBus)] = new HttpTransportBus(nodeName);
httpConfiguration.Properties[typeof(RaftEngine)] = raftEngine;
builder.UseWebApi(httpConfiguration);
}))
{

Console.WriteLine("Ready & processing requests, press ENTER to sop");

Console.ReadLine();
}
}
}

Note that we need to initialize both the state machine and the raft engine, then wire the raft engine controllers. Now we are pretty much done with setup, and we can turn to the actual semantics of running the cluster. The first thing that I want to do is to setup the baseline, so we create this base controller:

public abstract class TailFeatherController : ApiController
{
public KeyValueStateMachine StateMachine { get; private set; }
public RaftEngine RaftEngine { get; private set; }

public override async Task<HttpResponseMessage> ExecuteAsync(HttpControllerContext controllerContext, CancellationToken cancellationToken)
{
RaftEngine = (RaftEngine)controllerContext.Configuration.Properties[typeof(RaftEngine)];
StateMachine = (KeyValueStateMachine)RaftEngine.StateMachine;
try
{
return await base.ExecuteAsync(controllerContext, cancellationToken);
}
catch (NotLeadingException)
{
var currentLeader = RaftEngine.CurrentLeader;
if (currentLeader == null)
{
return new HttpResponseMessage(HttpStatusCode.PreconditionFailed)
{
Content = new StringContent("{ 'Error': 'No current leader, try again later' }")
};
}
var leaderNode = RaftEngine.CurrentTopology.GetNodeByName(currentLeader);
if (leaderNode == null)
{
return new HttpResponseMessage(HttpStatusCode.InternalServerError)
{
Content = new StringContent("{ 'Error': 'Current leader " + currentLeader + " is not found in the topology. This should not happen.' }")
};
}
return new HttpResponseMessage(HttpStatusCode.Redirect)
{
Headers =
{
Location = leaderNode.Uri
}
};
}
}
}

That is a lot of error handling, but basically it just get the right values from the configuration and expose them to the controller actions, then a lot of error handling when we have a command that requires a leader that hit a follower.

Next step, actually managing the cluster, here we go:

public class AdminController : TailFeatherController
{
[HttpGet]
[Route("tailfeather/admin/fly-with-us")]
public async Task<HttpResponseMessage> Join([FromUri] string url, [FromUri] string name)
{
var uri = new Uri(url);
name = name ?? uri.Host + (uri.IsDefaultPort ? "" : ":" + uri.Port);

await RaftEngine.AddToClusterAsync(new NodeConnectionInfo
{
Name = name,
Uri = uri
});
return new HttpResponseMessage(HttpStatusCode.Accepted);
}

[HttpGet]
[Route("tailfeather/admin/fly-away")]
public async Task<HttpResponseMessage> Leave([FromUri] string name)
{
await RaftEngine.RemoveFromClusterAsync(new NodeConnectionInfo
{
Name = name
});
return new HttpResponseMessage(HttpStatusCode.Accepted);
}
}

So now we have a way to add and remove items from the cluster, which is all the admin stuff that we need to handle right now. Next, we need to actually wire the operations, this is done here:

public class KeyValueController : TailFeatherController
{
[HttpGet]
[Route("tailfeather/key-val/read")]
public HttpResponseMessage Read([FromUri] string key)
{
var read = StateMachine.Read(key);
if (read == null)
{
return Request.CreateResponse(HttpStatusCode.NotFound, new
{
RaftEngine.State,
Key = key,
Missing = true
});
}
return Request.CreateResponse(HttpStatusCode.OK, new
{
RaftEngine.State,
Key = key,
Value = read
});
}

[HttpGet]
[Route("tailfeather/key-val/set")]
public Task<HttpResponseMessage> Set([FromUri] string key, [FromUri] string val)
{
JToken jVal;
try
{
jVal = JToken.Parse(val);
}
catch (JsonReaderException)
{
jVal = val;
}

var op = new KeyValueOperation
{
Key = key,
Type = KeyValueOperationTypes.Add,
Value = jVal
};

return Batch(new[] { op });
}

[HttpGet]
[Route("tailfeather/key-val/del")]
public Task<HttpResponseMessage> Del([FromUri] string key)
{
var op = new KeyValueOperation
{
Key = key,
Type = KeyValueOperationTypes.Del,
};

return Batch(new[] { op });
}

[HttpPost]
[Route("tailfeather/key-val/batch")]
public async Task<HttpResponseMessage> Batch()
{
var stream = await Request.Content.ReadAsStreamAsync();
var operations = new JsonSerializer().Deserialize<KeyValueOperation[]>(new JsonTextReader(new StreamReader(stream)));

return await Batch(operations);
}

private async Task<HttpResponseMessage> Batch(KeyValueOperation[] operations)
{
var taskCompletionSource = new TaskCompletionSource<object>();
RaftEngine.AppendCommand(new OperationBatchCommand
{
Batch = operations,
Completion = taskCompletionSource
});
await taskCompletionSource.Task;

return Request.CreateResponse(HttpStatusCode.Accepted);
}
}

And we are pretty much set.

Note that I’ve been writing this post while I’m writing the code, so I’ve made some small changes, you can see actual code here.

Anyway, we are pretty much done. Now we can compile and try testing what is going on.

First, we seed the cluster, but running:

.\TailFeather.exe --port=9079 --DataPath=One --Name=One –Bootstrap

This tell us that this node is allowed to become a leader without having to pre-configure a cluster. This command runs and exit, so now we’ll run three such copies:

  • start .\TailFeather.exe "--port=9079 --DataPath=One --Name=One"
  • start .\TailFeather.exe "--port=9078 --DataPath=Two --Name=Two"
  • start .\TailFeather.exe "--port=9077 --DataPath=Three --Name=Three"

We have all three nodes up and running, so now is the time to actually make use of it:

http://localhost:9079/tailfeather/key-val/set?key=ravendb&val={ 'Url': 'http://live-test.ravendb.net', 'Database': 'Sample' }

In this case, you can see that we are setting a configuration value to point to a RavenDB database on the first node. Note that at this point, we have a single node cluster, and the two other are waiting to join it, but are taking no action.

We can get the value back using:

image

So far, so good. Now, let us add a second node in by inviting it to fly with our cluster. We do that using the following command:

http://localhost:9079/tailfeather/admin/fly-with-us?url=http://localhost:9078&name=Two

Which will give us:

image

Note that we are using the just added node for too look at this.

Next, we can add the third node.

http://localhost:9079/tailfeather/admin/fly-with-us?url=http://localhost:9077&name=Three

I would put the image in, but I think you get the point.

This is it for now. We have a highly available persistent & distributed key/value store. Next, we need to tackle the idea of snapshots and the client API, but I’ll deal with that at another post.

time to read 6 min | 1030 words

Rachis, def: Spinal column, also the distal part of the shaft of a feather that bears the web.

Rachis is the name we picked for RavenDB’s Raft implementation. Raft is a consensus protocol that can actually be understood without resorting to Greek philosophy. You can read all about it in here (there is a very cool interactive visualization there). I would also like to thank Diego Ongaro for both the Raft paper and a lot of help while I tried to understand the finer points of it.

Why Raft?

Raft is a distributed consensus protocol. It allows you to reach an order set of operations across your entire cluster. This means that you can apply a set of operations on a state machine, and have the same final state machine in all nodes in the cluster. It is also drastically simpler to understand than Paxos, which is the more known alternative.

What is Rachis?

Well, it is a Raft implementation. To be rather more exact, it is a Raft implementation with the following features:

  • (Obviously) the ability to manage a distributed set of state machine and reliability commits updates to said state machines.
  • Dynamic topology (nodes can join and leave the cluster on the fly, including state sync).
  • Large state machines (snapshots, efficient transfers, non voting members).
  • ACID local log using Voron.
  • Support for in memory and persistent state machines.
  • Support for voting & non voting members.
  • A lot of small tweaks for best behavior in strange situations (forced step down, leader timeout and many more).

What are you going to use this for?

To reach a consensus, of course Smile. More to the point, we got some really nice idea where this is going to allow us to do some really nice stuff. In particular, we want to use that as the backbone for the event and time series replication models.

But I’m getting ahead of myself. Before we do that, I want to build a simple reliable distributed service. We’ll call it Tail/Feather and it will be awesome, in a weekend project kind of way. I’ll post full details about this in my next post.

Where can I look at it?

The current version is here, note that you’ll need to pull Voron as well (from the ravendb repository) to get it working.

How does this work?

You can read the Raft paper and the full thesis, of course, but there are some subtleties that I had to work through (with great help from Diego), so it is worth going into a bit more detail.

Clusters are typically composed of odd number of servers (3,5 or 7), which can communicate freely with one another. The startup process for a cluster require us to designate a single server as the seed. This is the only server that can become the cluster leader during the cluster bootstrap process. This is done to make sure that during startup, before we had the chance to tell the servers about the cluster topology, they won’t consider themselves a single node cluster and start accepting requests before we add them to the cluster.

Only the seed server will become the leader, and all the others will wait for instructions. We can then let the seed server know about the other nodes in the cluster. It will initiate a join operation which will reach to the other node, setup the appropriate cluster topology. At that point, all the other servers are on equal footing, and there is no longer any meaningful distinction between them. The notion of a seed node it only  relevant for cluster bootstrap, once that is done, all servers have the same configuration, and there isn’t any difference between them.

Dynamically adding and removing nodes from the cluster

Removing a node from the cluster is a simple process. All we need to do is to update the cluster topology, and we are done. The removed server will get a notification to let it know that is has been disconnected from the cluster, and will move itself to a passive state (note that it is okay if it doesn’t get this notification, we are just being nice about it Smile).

The process of adding a server is a bit more complex. Not only are we having to add a new node, we need to make sure that it has the same state as all other nodes in the cluster. In order to do that, we handle it in multiple stages. A node added to the cluster can be in one of three states: Voting (full member of the cluster, able to become a leader), Non Voting (just listening to what is going on, can’t be a leader), Promotable (getting up to speed with the cluster state). Non voting members are a unique case, they are there to enable some advance scenarios (cross data center communication, as we currently envision it).

Promotable is a lot more interesting. Adding a node to an existing cluster can be a long process, especially if we are managing a lot of data. In order to handle that, we adding a server to the promotable category, in which case we are starting to send it the state it needs to catch up with the rest of the cluster. Once it has caught up with the cluster (it has all the committed entries in the cluster), we will automatically move it to the voting members in the cluster.

Note that it is fine for crashes to happen throughout this process. The cluster leader can crash during this, and we’ll recover and handle this properly.

Normal operations

During normal operations, there is going to be a leader that is going to be accepting all the requests for the cluster, and handle committing them cluster wide. During those operations, you can spread reads across members in the cluster, for better performance.

Now, if you don’t mind, I’m going to be writing Tail/Feather now, and see how long it takes.

time to read 6 min | 1115 words

It may not get enough attention, but we have been working on the profilers as well during the past few months.

TLDR; You can get the next generation of NHibernate Profiler and Entity Framework Profiler now, lots of goodies to look at!

I’m sure that a lot of people would be thrilled to hear that we dropped Silverlight in favor of going back to WPF UI. The idea was that we would be able to deploy anywhere, including in production. But Silverlight just made things harder all around, and customers didn’t like the production profiling mode.

Production Profiling

We have changed how we profile in production. You can now make the following call in your code:

NHibernateProfiler.InitializeForProduction(port, password);

And then connect to your production system:

image

At which point you can profile what is going on in your production system safely and easily. The traffic between your production server and the profiler is SSL encrypted.

NHibernate 4.x and Entity Framework vNext support

The profilers now support the latest version of NHibernate and Entity Framework. That include profiling async operations, better suitability for modern apps, and more.

New SQL Paging Syntax

We are now properly support SQL Server paging syntax:

select * from Users
order by Name
offset 0 /* @p0 */ rows fetch next 250 /* @p1 */ rows only

This is great for NHibernate users, who finally can have a sane paging syntax as well as beautiful queries in the profiler.

At a glance view

A lot of the time, you don’t want the profiler to be front and center, you want to just run it and have it there to glance at once in a while. The new compact view gives you just that:

image

You can park it at some point in your screen and work normally, glancing to see if it found anything. This is much less distracting than the full profiler for normal operations.

Scopes and groups

When we started working on the profilers, we followed the “one session per request” rule, and that was pretty good. But a lot of people, especially in the Entity Framework group are using multiple sessions or data contexts in a single request, but they still want to see the ability to see the operations in a request at a glance. We are now allowing you to group things, like this:

image

By default, we use the current request to group things, but we also give you the ability to define your own scopes. So if you are profiling NServiceBus application, you can set the scope as your message handling by setting ProfilerIntegration.GetCurrentScopeName or explicitly calling ProfilerIntegration.StartScope whenever you want.

Customized profiling

You can now surface troublesome issues directly from your code. If you have an issue with a query, you can mark it for attention using CustomQueryReporting .ReportError() that would flag it in the UI for further investigation.

You can also just mark interesting pieces in the UI without an error, like so:

using (var db = new Entities(conStr))
{
    var post1 = db.Posts.FirstOrDefault();

    using (ProfilerIntegration.StarStatements("Blue"))
    {
        var post2 = db.Posts.FirstOrDefault();
    }

    var post3 = db.Posts.FirstOrDefault();
    
    ProfilerIntegration.StarStatements();
    var post4 = db.Posts.FirstOrDefault();
    ProfilerIntegration.StarStatementsClear();

    var post5 = db.Posts.FirstOrDefault();
}

Which will result in:

image

Disabling profiler from configuration

You can now disable the profiler by setting:

<add key="HibernatingRhinos.Profiler.Appender.NHibernate" value="Disabled" />

This will avoid initializing the profiler, obviously. The intent is that you can setup production profiling, disable it by default, and enable it selectively if you need to actually figure things out.

Odds & ends

We move to WebActivatorEx  from the deprecated WebActivator, added xml file for the appender, fixed a whole bunch of small bugs, the most important among them is:

clip_image001[4]

 

Linq to SQL, Hibernate and LLBLGen Profilers, RIP

You might have noticed that I talked only about NHibernate and Entity Framework Profilers. The sales for the rests weren’t what we hoped they would be, and we are no longer going to sale them.

Go get them, there is a new release discount

You can get the NHibernate Profiler and Entity Framework Profiler for a 15% discount for the next two weeks.

time to read 2 min | 285 words

This post isn’t so much about this particular problem, but about the way we solved this.

We have a number of ways to track performance problems, but this is a good example, we can see that for some reason, this test has failed because it took too long to run:

image

In order to handle that, I don’t want to run the test, I don’t actually care that much about this. So I wanted to be able to run this independently.

To do that, I added:

image

This opens us the studio with all the data that we have for this test. Which is great, since this means that we can export the data.

image

That done, we can import it to an instance that we control, and start testing the performance.  In particular, we can run in under a profiler, to see what it is doing.

The underlying reason ended up being an issue with how we flush things to disk, which was easily fixed once we could narrow it down. The problem was just getting it working in a reproducible manner. This approach, being able to just stop midway through a test and capture the full state of the system is invaluable in troubleshooting what is going on.

FUTURE POSTS

No future posts left, oh my!

RECENT SERIES

  1. RavenDB 7.1 (7):
    11 Jul 2025 - The Gen AI release
  2. Production postmorterm (2):
    11 Jun 2025 - The rookie server's untimely promotion
  3. Webinar (7):
    05 Jun 2025 - Think inside the database
  4. Recording (16):
    29 May 2025 - RavenDB's Upcoming Optimizations Deep Dive
  5. RavenDB News (2):
    02 May 2025 - May 2025
View all series

Syndication

Main feed ... ...
Comments feed   ... ...
}