Ayende @ Rahien

It's a girl

Gossip much? Operating with partial information, and getting the right result.

Unlike the previous two posts, this is going to be short. Primarily because what I wanted to talk about it what impresses me most with both HyParView and Plumtree. The really nice thing about them is that they are pretty simple, easy to understand and produce good results.

But the fun part, and what make it impressive is that they manage to achieve that with a small set of simple rules, and without any attempt to create a global view. They operate just fine with potentially very small set of the data overall, but still manage to operate, self optimize and get to the correct result. In fact, I did some very minor attempts to play with this at large scale, and we see a pretty amazing emergent behavior. Without anyone knowing what is going on globally, we are able to get to the optimal number of interactions in the cluster to distribute information.

That is really pretty cool.

And because this post is too short, I’ll leave you with a question. Given that you have this kind of infrastructure, what would you do with it? What sort of information or operations would you try to send using this way?

Published at

Originally posted at

Gossip much? The gossip epidemic and other issues in polite society

In my previous post, I talked about the Hybrid Partial View protocol, and showed a visualization about how it actually works. Something that is important to note about this protocol, it is mostly meant to create a gossip topology that is resilient to failure. It is not meant to actually send messages, it is meant to serve as the backbone topology (the peer sampling service) for figuring out what are the nodes.

The reason for that can be seen in the following 10 node cluster (after running heartbeat enough times to get to a stable state:

image

Let us assume that we want to disseminate a message across the cluster. We select node A as our root, and then send a message. The rules are as follow:

  • Each node send the message to all its active connections (except the sender, of course).
  • A node that got the same message twice will ignore the message.

Based on those rules, and the topology above, we’re going to have the following chain of messages:

  • F – initial broadcast
  • F -> E, G, J
  • E -> G, I
  • G -> E, H
  • J -> I, H, J
  • H -> C, I
  • C -> B, A
  • B -> D, A
  • A -> B, D
  • D -> A

The total number of messages passed is 20. Which is twice as much as the optimal solution would generate.

What is worse, this is a very small network, and as the network grows, so will the number of redundant messages. This approach (called eager gossiping) has a major advantage, because it will traverse all paths in the graph, it will also traverse all the shortest paths. That means that the time to get a message from the origin to all nodes is the smallest, but the number of operations is high.

The Plumtree paper (Epidemic Broadcast Trees) presents a solution to this problem. It tries to minimize the number of messages while still maintaining both reliability and optimizing the number of messages that are passed as well as the distance they have to pass.

The way Plumtree works is explained in quite beautiful detail in the paper, but the underlying idea goes like this, we start using the same approach as the eager gossiping, but whenever we get a message that we already got, we will reply to the source and tell it to stop sending us further messages. This is done so the next time that a message will be sent, we can skip the known duplicate path, and reduce the number of overall messages that we have.

So the first run is going to generate 20 messages on the network. The second is going to generate just 13, you can see the non traversed paths in the following image:

image

Note that we didn’t pass any messages between I and J, or D and A. But a lot of the saving was achieved by avoiding duplicate notifications. So node I notified node H, but not vice versa. The next time we’ll run this, we have exactly 10 messages passing:

image

Now, obviously this is pretty cool, but that is under a stable state. What happens when they are failures? Well, at that point, the notion of lazy vs. eager peers come into play. One of the things we did initially was to clear the duplicate paths in the network, so we can optimize the number of messages being passed. That is pretty cool, but it also leave us vulnerable to failures. For example, imagine that nod H is down. What happens then?

There are two aspects of this that are interesting. Plumtrees only care about the notion of message passing. They don’t deal with topology changes. In this case, the responsibility to join the different parts of the network lies with the peer sampling service, which is HyParView in this case. That would figure out the communication issue, and forge new connections with the remaining nodes. Plumtree will get notified about that, and the process continue.

But let us leave that one aside, let us say that we have a static topology, how would Plumtree handle this? Well, at this point you have to realize that Plumtree doesn’t just drop a connection when a node tell it that it already heard about a message. It just move it to a lazy state. Periodically, a node will contact other nodes which told it that it wasn’t needed and tell them: “Hi, I got messages with ids (43,41,81), do you have them?”. In this way, a node whose contact point went down would become aware that there are missing messages. At that point, it start a timer, and if it didn’t hear about those missing messages, it will ask the node that told it about those messages to send them over, and initiate an active link. The end result here is that we send additional messages, but those tend to be pretty small, just the message ids.

During steady state, we’ll just ignore those messages, but if there is a failure, they can help us recover from errors by letting us know that there are messages that we are missing, and taking action to recover that.

There is also another important aspect of this behavior, detecting and circumventing slow nodes. If a node is slow to distribute messages to its peers, other nodes will notify those peers that those messages exists, and if that is the case, we’ll eventually move to a more efficient topology by routing around that slow node.

You can see a full visualization of that (and admire my rapidly improving UI skills) here. The JavaScript implementation of the algorithm is here.

Plumtree has a few weaknesses, mostly it is that it is optimized for a single source topology. In other words, the first node you start from will influence the optimization of the network, and if you start a broadcast from another node, it won’t be an optimal operation. That said, there are a few ways to handle that. The actual topology remains the same, what influence Plumtree is the rejection replies from nodes that say that the information it transmitted was already received. We can keep track on not only the nodes that rejected us, but the root source of that rejection, so a message originating in E wouldn’t stop us from propagating a message originating in J.

Because Plumtree is meant for very large clusters (the paper talks about testing this with 10,000 nodes), and you might have a message originate from any one of those, you probably want to limit the notion of “origin”, if you track the past three nodes it passed through, you get a reasonably small amount of data that you have to keep, and it is likely to be accurate enough to build multiple topologies that will optimize themselves based on actual conditions.

That is it for this post, I’ve got a couple more posts that I want to write about gossips, but that would be it for today.

Published at

Originally posted at

Gossip much? Disseminating information among high number (10K) of nodes

Every once in a while, I like to sit down and read about what is going on outside my current immediate field of interest. This weekend, I chose to focus on efficient information dissemination with very large number of nodes.

The articles of interests for this weekend are HyParView and Epidemic Broadcast Trees (Plumtrees). There are a great read, and complement one another to a nice degree. HyParView is an algorithm that seeks to connect a set (of potentially very large number) of nodes together without trying to make each node connect to each other node. To simplify things, I’m going to talk about clusters of several dozens nodes, the articles have both been tested to the 10,000 nodes and with failure rates of up to 95% of the network. This post is here so I can work out the details in my mind, it may be that I’m wrong, so don’t try to treat this as a definitive source.

Let us assume that we have a network with 15 nodes in it. And we want to add a new node. One way of doing that would be to maintain a list of all the nodes in the system (that are what admins are for, after all) and have the node connect to all the other nodes. In this way, we can communicate between all the nodes very easily. Of course, that means that the number of connections we have in a network of 16(15+ new) nodes is 120. And that utterly ignore the notion of failure. But let us continue with this path, to see what unhappy landscape it is going to land us on.

We have a 15 node cluster, and we add a new node (so we have to give it all the other nodes), and it connects to all the other nodes and register with them. So far, so good. Now, let us say that there is a state change that we want to send to all the nodes in the network. We can do that by connecting to a single node, and having it distribute this information to all the other nodes. Cost of this would be 16 (1 to talk to the first node, then 15 for it to talk to the rest). That is very efficient, and it is easy to prove that this is indeed the most optimal way to disseminate information over the network (each node is only contacted once).

In a 16 node network, maybe that is even feasible. It is a small cluster, after all. But that is a big maybe, and I wouldn’t recommend it. If we grow the cluster size to a 100 node cluster, that gives us about 4,950(!) connections between all nodes, and the cost of sending a single piece of information is still the optimal N. But I think that this is easy to see that this isn’t the way to go about it. Mostly because you can’t do that, not even for the 16 node cluster. Usually when we talk about clusters we like to think about them as flat topologies, but that isn’t actually how it goes. Let us look at a better approximation of a real topology:

image

Yes, this isn’t quite right, but it is good enough for our purposes.

In this 16 node cluster, we have the green node, which is the one we initially contact to send some data to the entire cluster. What would happen if we tried to talk from that node to all the other nodes? Well, notice how much load it would place on the green’s node router. Or the general cost for the network in the area of the green node. Because of that, just straight on direct connection for the entire cluster is no something that you really want to do.

An alternative to do that, assuming that you have a fixed topology is to create a static tree structure, so you start with the green node, it then contacts three other nodes, who then each contact four other nodes. We still have the nice property so that each node is only getting the new information once. But we can parallelize the communication and reduce the load on a single segment of the network.

Which is great, if we have a static topology and zero failures. In practice, none of those is true, so we want something else, and hopefully something that would make this a lot easier to handle. This is where HyParView comes into play. I sat down and wrote a whole big description of how HyParView works, but it wasn’t something that you couldn’t get from the article. And one of the things that I did along the way was create a small implementation in JavaScript and plug this into a graph visualization, so I could follow what is going on there.

 

That means that I had to implement the HyParView protocol in JavaScript, but it turned out to be a great way to actually explore how the thing works, and it ended up with great visualization.

You can see it in action in this url, and you can read the actual ocde for the HyParView protocol here.

Here is the cluster at 5 nodes, just after we added E:

image

And here it is at 9 nodes, after it had a chance to be stable.

image

Note that we have the connections (active view) from each node to a up to 3 other nodes, but we also have other letters next to the node name, in []. That is the passive list, the list of nodes that we are not connected to, but will try if our connection to the one of the active list goes down.

In addition to just adding themselves to one of the nodes, the nodes will also attempt to learn the topology of the network in such a way that if there is a failure, they can recover from it. The JavaScript code I wrote is not a good JavaScript code, that isn’t my forte, but it should be enough to follow what is going on there. We are able to do very little work to have a self organizing system of nodes that discover the network.

Note that in large networks, none of the nodes would have the full picture of the entire network, but each node will have a partial view of it, and that is enough to send a message through the entire network. But I’m going to talk about this in another post.

In the meantime, go to this url and see it in action, (the actual ocde for the HyParView protocol here). Note that I've made the different action explicit, so you need to do heartbeats (and the algorithm relies on them for healing failures) to get proper behavior for the system. I've also created a predictable RNG, so we can always follow the same path in our iterations.

Tail/Feather–Snapshots

The Raft protocol gives us a stable replicated distributed log. In other words, all servers in the cluster will agree on all the committed entries to the log (both what they are, and in what position). We usually fill the logs in operations that a state machine will execute.

In the Tail/Feather example, the commands are set/del operations on the key value store. Note that this doesn’t mean that all servers will always have the same state. It is possible that a server (or set of servers) will have an outdated view of the log, but the log that they have will match up to the point that they have it.

So, what is the problem? What happens when we have an active system? Well, every time that we make a modification, we’ll add it to the log. That is all good and great, but what about the actual log? Well, it is going to stay there, we need it so we can catch up any new server that will join the cluster. But that means that over time, we are going to have an unbounded growth. Which isn’t a very nice thing to have.

Rachis handle this by asking the state machine to implement snapshots. A way to take the current state of the state machine and transmit it over the network. For example, assume that we have an entry full of these logs:

{ Op: "Add", Key: "users/1/login-attempts", "Value": 1}
{ Op: "Add", Key: "users/1/login-attempts", "Value": 2}
{ Op: "Add", Key: "users/1/login-attempts", "Value": 3}

// ...

{ Op: "Add", Key: "users/1/login-attempts", "Value": 300000}

The log for that is 300,000 entries long, but the current state of the machine:

{ "users/1/login-attempts": 300000 }

Which is obviously much smaller. Rachis doesn’t force a state machine to implement this, but if it isn’t doing so, we can never clear the log. But implementing snapshots has its own problems.

What about the actual cost of creating the snapshot? Imagine that we ask the state machine for a snapshot every 10,000 entries. In the example above, that would mean just writing out { "users/1/login-attempts": 300000 } or whatever the actual current value is.

{ Op: "Add", Key: "users/1/login-attempts", "Value": 1}
{ Op: "Add", Key: "users/2/login-attempts", "Value": 1}
{ Op: "Add", Key: "users/3/login-attempts", "Value": 1}

// ...

{ Op: "Add", Key: "users/300000/login-attempts", "Value": 1}

Note that instead of having 300,000 changes to the same key, we are going to have 300,000 keys. In this case, writing the full list down on every snapshot is very expensive. That is what incremental backups are here to solve.  We let Voron know that this is what we want by specifying:

options.IncrementalBackupEnabled = true;

And now it is time to define policies about taking snapshots. We are going to handle this using Voron full & incremental snapshots. You can see the logic in the following code.

public void CreateSnapshot(long index, long term, ManualResetEventSlim allowFurtherModifications)
{
    // we have not snapshot files, so this is the first time that we create a snapshot
    // we handle that by asking voron to create a full backup
    var files = Directory.GetFiles(_storageEnvironment.Options.BasePath, "*.Snapshot");
    Array.Sort(files, StringComparer.OrdinalIgnoreCase); // make sure we get it in sort order
    if (files.Any() == false)
    {
        DoFullBackup(index, term, allowFurtherModifications);
        return;
    }
    string lastFullBackup = null;
    int fullBackupIndex = -1;
    for (int i = files.Length - 1; i >= 0; i--)
    {
        if (!files[i].StartsWith("Full")) 
            continue;
        fullBackupIndex = i;
        lastFullBackup = files[i];
        break;
    }
            
    if (lastFullBackup == null)
    {
        // this shouldn't be the case, we must always have at least one full backup. 
        // maybe user deleted it? We'll do a full backup here to compensate
        DoFullBackup(index, term, allowFurtherModifications);
        return;
    }
            
    var fullBackupSize = new FileInfo(lastFullBackup).Length;
    var incrementalBackupsSize = files.Skip(fullBackupIndex + 1).Sum(f => new FileInfo(f).Length);

    // now we need to decide whatever to do a full or incremental backup, doing incremental backups stop 
    // making sense if they will take more space than the full backup. Our cutoff point is when it passes to 50%
    // size of the full backup.
    // If full backup size is 1 GB, and we have 25 incrmeental backups that are 600 MB in size, we need to transfer
    // 1.6 GB to restore. If we generate a new full backup, we'll only need to transfer 1 GB to restore.

    if (incrementalBackupsSize / 2 > fullBackupSize)
    {
        DoFullBackup(index, term, allowFurtherModifications);
        return;
    }

    DeleteOldSnapshots(files.Take(fullBackupIndex - 1));// delete snapshots older than the current full backup

    var incrementalBackup = new IncrementalBackup();
    incrementalBackup.ToFile(_storageEnvironment,
        Path.Combine(_storageEnvironment.Options.BasePath, string.Format("Inc-{0:D19}-{1:D19}.Snapshot", index, term)),
        infoNotify: Console.WriteLine,
        backupStarted: allowFurtherModifications.Set);
}

private void DoFullBackup(long index, long term, ManualResetEventSlim allowFurtherModifications)
{
    var snapshotsToDelete = Directory.GetFiles(_storageEnvironment.Options.BasePath, "*.Snapshot");

    var fullBackup = new FullBackup();
    fullBackup.ToFile(_storageEnvironment,
        Path.Combine(_storageEnvironment.Options.BasePath, string.Format("Full-{0:D19}-{1:D19}.Snapshot", index, term)),
        infoNotify: Console.WriteLine,
        backupStarted: allowFurtherModifications.Set
        );

    DeleteOldSnapshots(snapshotsToDelete);
}

private static void DeleteOldSnapshots(IEnumerable<string> snapshotsToDelete)
{
    foreach (var snapshot in snapshotsToDelete)
    {
        try
        {
            File.Delete(snapshot);
        }
        catch (Exception)
        {
            // we ignore snapshots we can't delete, they are expected if we are concurrently writing
            // the snapshot and creating a new one. We'll get them the next time.
        }
    }
}

Basically, we need to strike a balance between full and incremental backups. We do that by first taking a full backup, and then starting to take incremental backups until our incremental backups takes more than 50% of the full backup, at which point we are probably better off doing another full backup. Note that we use the event of a full backup to clear the old incremental and full backup files.

And with that, we can move to actually sending the snapshot over the wire. This is exposed by the GetSnapshotWriter() method. This just shell all the responsibility to the SnapshotWriter:

public ISnapshotWriter GetSnapshotWriter()
{
    return new SnapshotWriter(this);
}

public class SnapshotWriter : ISnapshotWriter
{
    private readonly KeyValueStateMachine _parent;

    private List<FileStream> _files = new List<FileStream>();

    public SnapshotWriter(KeyValueStateMachine parent)
    {
        _parent = parent;
        var files = Directory.GetFiles(_parent._storageEnvironment.Options.BasePath, "*.Snapshot");
        var fullBackupIndex = GetFullBackupIndex(files);

        if (fullBackupIndex == -1)
            throw new InvalidOperationException("Could not find a full backup file to start the snapshot writing");

        var last = Path.GetFileNameWithoutExtension(files[files.Length-1]);
        Debug.Assert(last != null);
        var parts = last.Split('-');
        if(parts.Length != 3)
            throw new InvalidOperationException("Invalid snapshot file name " + files[files.Length - 1] + ", could not figure out index & term");

        Index = long.Parse(parts[1]);
        Term = long.Parse(parts[2]);

        for (int i = fullBackupIndex; i < files.Length; i++)
        {
            _files.Add(File.OpenRead(files[i]));
        }
    }

    public void Dispose()
    {
        foreach (var file in _files)
        {
            file.Dispose();
        }
    }

    public long Index { get; private set; }
    public long Term { get; private set; }
    public void WriteSnapshot(Stream stream)
    {
        var writer = new BinaryWriter(stream);
        writer.Write(_files.Count);
        foreach (var file in _files)
        {
            writer.Write(file.Name);
writer.Write(file.Length); writer.Flush(); file.CopyTo(stream); } } }

What is going on here? We get the snapshot files, and find the latest full backup, then we open all the files that we’ll need for the snapshot (the last full backup and everything afterward). We need to open them in the constructor to lock them for deletion by the CreateSnapshot() method.

Then we just concatenate them all and send them over the wire. And getting them? That is pretty easy as well:

public void ApplySnapshot(long term, long index, Stream stream)
{
    var basePath = _storageEnvironment.Options.BasePath;
    _storageEnvironment.Dispose();

    foreach (var file in Directory.EnumerateFiles(basePath))
    {
        File.Delete(file);
    }

    var files = new List<string>();

    var buffer = new byte[1024*16];
    var reader = new BinaryReader(stream);
    var filesCount = reader.ReadInt32();
    if (filesCount == 0)
        throw new InvalidOperationException("Snapshot cannot contain zero files");
    for (int i = 0; i < filesCount; i++)
    {
        var name = reader.ReadString();
        files.Add(name);
        var len = reader.ReadInt64();
        using (var file = File.Create(Path.Combine(basePath, name)))
        {
            file.SetLength(len);
            var totalFileRead = 0;
            while (totalFileRead < len)
            {
                var read = stream.Read(buffer, 0, (int) Math.Min(buffer.Length, len - totalFileRead));
                if (read == 0)
                    throw new EndOfStreamException();
                totalFileRead += read;
                file.Write(buffer, 0, read);
            }
        }
    }
            
    new FullBackup().Restore(Path.Combine(basePath, files[0]), basePath);

    var options = StorageEnvironmentOptions.ForPath(basePath);
    options.IncrementalBackupEnabled = true;
    //TODO: Copy any other customizations that might have happened on the options

    new IncrementalBackup().Restore(options, files.Skip(1));

    _storageEnvironment = new StorageEnvironment(options);

    using (var tx = _storageEnvironment.NewTransaction(TransactionFlags.ReadWrite))
    {
        var metadata = tx.ReadTree("$metadata");
        metadata.Add("last-index", EndianBitConverter.Little.GetBytes(index));
        LastAppliedIndex = index;
        tx.Commit();
    }
}

Unpack the snapshots from the stream, then first apply a full backup, then all the incremental backups. Make sure to update the last applied index, and we are set Smile.

Published at

Originally posted at

Tail/Feather–The client API

As I mentioned Tail/Feather is a weekend project to test out how stuff works for real. After creating the highly available distributed key/value store, we are now in need of actually building a client API for it.

Externally, that API is going to look like this:

public class TailFeatherClient : IDisposable
{
public TailFeatherClient(params Uri[] nodes);

public Task Set(string key, JToken value);

public Task<JToken> Get(string key);

public Task Remove(string key);

public void Dispose();
}

If this wasn’t a weekend project, I would add batch support, but that isn’t important for our purposes right now. The API itself is pretty stupid, which is great, but what about the actual behavior?

We want it to be able to handle dynamic cluster changes, and we need it to be smart about it. A lot of that is shared among all operations, so the next layer of the API is:

public Task Set(string key, JToken value)
{
return ContactServer(client => client.GetAsync(string.Format("tailfeather/key-val/set?key={0}&val={1}",
Uri.EscapeDataString(key), Uri.EscapeDataString(value.ToString(Formatting.None)))));
}

public async Task<JToken> Get(string key)
{
var reply = await ContactServer(client => client.GetAsync(string.Format("tailfeather/key-val/del?key={0}",
Uri.EscapeDataString(key))));
var result = JObject.Load(new JsonTextReader(new StreamReader(await reply.Content.ReadAsStreamAsync())));

if (result.Value<bool>("Missing"))
return null;

return result["Value"];
}

public Task Remove(string key)
{
return ContactServer(client => client.GetAsync(string.Format("tailfeather/key-val/del?key={0}",
Uri.EscapeDataString(key))));
}

The actual behavior is in ContactServer:

private readonly ConcurrentDictionary<Uri, HttpClient> _cache = new ConcurrentDictionary<Uri, HttpClient>();
private Task<TailFeatherTopology> _topologyTask;

public TailFeatherClient(params Uri[] nodes)
{
_topologyTask = FindLatestTopology(nodes);
}

private HttpClient GetHttpClient(Uri node)
{
return _cache.GetOrAdd(node, uri => new HttpClient { BaseAddress = uri });
}

private async Task<TailFeatherTopology> FindLatestTopology(IEnumerable<Uri> nodes)
{
var tasks = nodes.Select(node => GetHttpClient(node).GetAsync("tailfeather/admin/flock")).ToArray();

await Task.WhenAny(tasks);
var topologies = new List<TailFeatherTopology>();
foreach (var task in tasks)
{
var message = task.Result;
if (message.IsSuccessStatusCode == false)
continue;

topologies.Add(new JsonSerializer().Deserialize<TailFeatherTopology>(
new JsonTextReader(new StreamReader(await message.Content.ReadAsStreamAsync()))));
}

return topologies.OrderByDescending(x => x.CommitIndex).FirstOrDefault();
}

private async Task<HttpResponseMessage> ContactServer(Func<HttpClient, Task<HttpResponseMessage>> operation, int retries = 3)
{
if (retries < 0)
throw new InvalidOperationException("Cluster is not reachable, or no leader was selected. Out of retries, aborting.");

var topology = (await _topologyTask ?? new TailFeatherTopology());

var leader = topology.AllVotingNodes.FirstOrDefault(x => x.Name == topology.CurrentLeader);
if (leader == null)
{
_topologyTask = FindLatestTopology(topology.AllVotingNodes.Select(x => x.Uri));
return await ContactServer(operation, retries - 1);
}

// now we have a leader, we need to try calling it...
var httpResponseMessage = await operation(GetHttpClient(leader.Uri));
if (httpResponseMessage.IsSuccessStatusCode == false)
{
// we were sent to a different server, let try that...
if (httpResponseMessage.StatusCode == HttpStatusCode.Redirect)
{
var redirectUri = httpResponseMessage.Headers.Location;
httpResponseMessage = await operation(GetHttpClient(redirectUri));
if (httpResponseMessage.IsSuccessStatusCode)
{
// we successfully contacted the redirected server, this is probably the leader, let us ask it for the topology,
// it will be there for next time we access it
_topologyTask = FindLatestTopology(new[] { redirectUri }.Union(topology.AllVotingNodes.Select(x => x.Uri)));

return httpResponseMessage;
}
}

// we couldn't get to the server, and we didn't get redirected, we'll check in the cluster in general
_topologyTask = FindLatestTopology(topology.AllVotingNodes.Select(x => x.Uri));
return await ContactServer(operation, retries - 1);
}

// happy path, we are done
return httpResponseMessage;
}

There is quite a bit going on here. But the basic idea is simple. Starting from the initial list of nodes we have, contact all of them and find the topology with the highest commit index. That means that it is the freshest, so more likely to be the current one. From the topology, we take the leader, and send all queries to the leader.

If there is any sort of errors, we’ll contact all other servers to find who we are supposed to be using now. If we can’t find it after three tries, we give us and we let the caller sort it out, probably by retrying once the cluster is in a steady state again.

Now, this is really nice, but it is falling into the heading of weekend code. That is means that this is quite far from what I would call production code. What is missing?

  • Caching the topology locally in a persistent manner so we can restart when the known servers are down from last known good topology.
  • Proper error handling, and in particular, error reporting, to make sure that we can understand what is actually is going on.
  • Features such as allowing reads from non leaders, testing, etc.

But overall, I’m quite happy with this.

Tail/Feather–highly available distributed key/value store weekend project

Weekend project means just that, I’m trying some things out, and writing something real is the best way to exercise. This isn’t going to be a full blown project, but it should be functional and usable.

The basic idea, I’m going to build a distributed key/value configuration store. Similar to etcd, this will allow me to explore how to handle full blown Rachis from both server & client sides.

We want this to be a full  blown implementation, which means persistence, snapshots, network api, the works.

In terms of the data model, we’ll go for the simplest possible one. A key/value store. A key is a string of up to 128 characters. A value is a json formatted value of up to 16Kb. Persistence will be handled by Voron. The persistent of the project is mostly Voron, so what we are left with is the following:

public enum KeyValueOperationTypes
{
Add,
Del
}

public class KeyValueOperation
{
public KeyValueOperationTypes Type;
public string Key;
public JToken Value;
}

public class OperationBatchCommand : Command
{
public KeyValueOperation[] Batch { get; set; }
}

This gives us the background for the actual state machine:

public class KeyValueStateMachine : IRaftStateMachine
{
readonly StorageEnvironment _storageEnvironment;

public KeyValueStateMachine(StorageEnvironmentOptions options)
{
_storageEnvironment = new StorageEnvironment(options);
using (var tx = _storageEnvironment.NewTransaction(TransactionFlags.ReadWrite))
{
_storageEnvironment.CreateTree(tx, "items");
var metadata = _storageEnvironment.CreateTree(tx, "$metadata");
var readResult = metadata.Read("last-index");
if (readResult != null)
LastAppliedIndex = readResult.Reader.ReadLittleEndianInt64();
tx.Commit();
}
}

public event EventHandler<KeyValueOperation> OperatonExecuted;

protected void OnOperatonExecuted(KeyValueOperation e)
{
var handler = OperatonExecuted;
if (handler != null) handler(this, e);
}

public JToken Read(string key)
{
using (var tx = _storageEnvironment.NewTransaction(TransactionFlags.Read))
{
var items = tx.ReadTree("items");

var readResult = items.Read(key);

if (readResult == null)
return null;


return JToken.ReadFrom(new JsonTextReader(new StreamReader(readResult.Reader.AsStream())));
}
}

public long LastAppliedIndex { get; private set; }

public void Apply(LogEntry entry, Command cmd)
{
var batch = (OperationBatchCommand)cmd;
Apply(batch.Batch, cmd.AssignedIndex);
}


private void Apply(IEnumerable<KeyValueOperation> ops, long commandIndex)
{
using (var tx = _storageEnvironment.NewTransaction(TransactionFlags.ReadWrite))
{
var items = tx.ReadTree("items");
var metadata = tx.ReadTree("$metadata");
metadata.Add("last-index", EndianBitConverter.Little.GetBytes(commandIndex));
var ms = new MemoryStream();
foreach (var op in ops)
{
switch (op.Type)
{
case KeyValueOperationTypes.Add:
ms.SetLength(0);

var streamWriter = new StreamWriter(ms);
op.Value.WriteTo(new JsonTextWriter(streamWriter));
streamWriter.Flush();

ms.Position = 0;
items.Add(op.Key, ms);
break;
case KeyValueOperationTypes.Del:
items.Delete(op.Key);
break;
default:
throw new ArgumentOutOfRangeException();
}
OnOperatonExecuted(op);
}

tx.Commit();
}
}


public void Dispose()
{
if (_storageEnvironment != null)
_storageEnvironment.Dispose();
}
}

As you can see, there isn’t much here. Not surprising, since we are storing a key/value data structure. I’m also ignoring snapshots for now. That is good enough for now, let us go for the network portion of the work. We are going to be using Web API for the network stuff. And we’ll be initializing it like so:

var nodeName = options.NodeName ?? (Environment.MachineName + ":" + options.Port);

var kvso = StorageEnvironmentOptions.ForPath(Path.Combine(options.DataPath, "KeyValue"));
using (var statemachine = new KeyValueStateMachine(kvso))
{
using (var raftEngine = new RaftEngine(new RaftEngineOptions(
new NodeConnectionInfo
{
Name = nodeName,
Url = new Uri("http://" + Environment.MachineName + ":" + options.Port),
},
StorageEnvironmentOptions.ForPath(Path.Combine(options.DataPath, "Raft")),
new HttpTransport(nodeName),
statemachine
)))
{
using (WebApp.Start(new StartOptions
{
Urls = { "http://+:" + options.Port + "/" }
}, builder =>
{
var httpConfiguration = new HttpConfiguration();
RaftWebApiConfig.Register(httpConfiguration);
httpConfiguration.Properties[typeof(HttpTransportBus)] = new HttpTransportBus(nodeName);
httpConfiguration.Properties[typeof(RaftEngine)] = raftEngine;
builder.UseWebApi(httpConfiguration);
}))
{

Console.WriteLine("Ready & processing requests, press ENTER to sop");

Console.ReadLine();
}
}
}

Note that we need to initialize both the state machine and the raft engine, then wire the raft engine controllers. Now we are pretty much done with setup, and we can turn to the actual semantics of running the cluster. The first thing that I want to do is to setup the baseline, so we create this base controller:

public abstract class TailFeatherController : ApiController
{
public KeyValueStateMachine StateMachine { get; private set; }
public RaftEngine RaftEngine { get; private set; }

public override async Task<HttpResponseMessage> ExecuteAsync(HttpControllerContext controllerContext, CancellationToken cancellationToken)
{
RaftEngine = (RaftEngine)controllerContext.Configuration.Properties[typeof(RaftEngine)];
StateMachine = (KeyValueStateMachine)RaftEngine.StateMachine;
try
{
return await base.ExecuteAsync(controllerContext, cancellationToken);
}
catch (NotLeadingException)
{
var currentLeader = RaftEngine.CurrentLeader;
if (currentLeader == null)
{
return new HttpResponseMessage(HttpStatusCode.PreconditionFailed)
{
Content = new StringContent("{ 'Error': 'No current leader, try again later' }")
};
}
var leaderNode = RaftEngine.CurrentTopology.GetNodeByName(currentLeader);
if (leaderNode == null)
{
return new HttpResponseMessage(HttpStatusCode.InternalServerError)
{
Content = new StringContent("{ 'Error': 'Current leader " + currentLeader + " is not found in the topology. This should not happen.' }")
};
}
return new HttpResponseMessage(HttpStatusCode.Redirect)
{
Headers =
{
Location = leaderNode.Uri
}
};
}
}
}

That is a lot of error handling, but basically it just get the right values from the configuration and expose them to the controller actions, then a lot of error handling when we have a command that requires a leader that hit a follower.

Next step, actually managing the cluster, here we go:

public class AdminController : TailFeatherController
{
[HttpGet]
[Route("tailfeather/admin/fly-with-us")]
public async Task<HttpResponseMessage> Join([FromUri] string url, [FromUri] string name)
{
var uri = new Uri(url);
name = name ?? uri.Host + (uri.IsDefaultPort ? "" : ":" + uri.Port);

await RaftEngine.AddToClusterAsync(new NodeConnectionInfo
{
Name = name,
Uri = uri
});
return new HttpResponseMessage(HttpStatusCode.Accepted);
}

[HttpGet]
[Route("tailfeather/admin/fly-away")]
public async Task<HttpResponseMessage> Leave([FromUri] string name)
{
await RaftEngine.RemoveFromClusterAsync(new NodeConnectionInfo
{
Name = name
});
return new HttpResponseMessage(HttpStatusCode.Accepted);
}
}

So now we have a way to add and remove items from the cluster, which is all the admin stuff that we need to handle right now. Next, we need to actually wire the operations, this is done here:

public class KeyValueController : TailFeatherController
{
[HttpGet]
[Route("tailfeather/key-val/read")]
public HttpResponseMessage Read([FromUri] string key)
{
var read = StateMachine.Read(key);
if (read == null)
{
return Request.CreateResponse(HttpStatusCode.NotFound, new
{
RaftEngine.State,
Key = key,
Missing = true
});
}
return Request.CreateResponse(HttpStatusCode.OK, new
{
RaftEngine.State,
Key = key,
Value = read
});
}

[HttpGet]
[Route("tailfeather/key-val/set")]
public Task<HttpResponseMessage> Set([FromUri] string key, [FromUri] string val)
{
JToken jVal;
try
{
jVal = JToken.Parse(val);
}
catch (JsonReaderException)
{
jVal = val;
}

var op = new KeyValueOperation
{
Key = key,
Type = KeyValueOperationTypes.Add,
Value = jVal
};

return Batch(new[] { op });
}

[HttpGet]
[Route("tailfeather/key-val/del")]
public Task<HttpResponseMessage> Del([FromUri] string key)
{
var op = new KeyValueOperation
{
Key = key,
Type = KeyValueOperationTypes.Del,
};

return Batch(new[] { op });
}

[HttpPost]
[Route("tailfeather/key-val/batch")]
public async Task<HttpResponseMessage> Batch()
{
var stream = await Request.Content.ReadAsStreamAsync();
var operations = new JsonSerializer().Deserialize<KeyValueOperation[]>(new JsonTextReader(new StreamReader(stream)));

return await Batch(operations);
}

private async Task<HttpResponseMessage> Batch(KeyValueOperation[] operations)
{
var taskCompletionSource = new TaskCompletionSource<object>();
RaftEngine.AppendCommand(new OperationBatchCommand
{
Batch = operations,
Completion = taskCompletionSource
});
await taskCompletionSource.Task;

return Request.CreateResponse(HttpStatusCode.Accepted);
}
}

And we are pretty much set.

Note that I’ve been writing this post while I’m writing the code, so I’ve made some small changes, you can see actual code here.

Anyway, we are pretty much done. Now we can compile and try testing what is going on.

First, we seed the cluster, but running:

.\TailFeather.exe --port=9079 --DataPath=One --Name=One –Bootstrap

This tell us that this node is allowed to become a leader without having to pre-configure a cluster. This command runs and exit, so now we’ll run three such copies:

  • start .\TailFeather.exe "--port=9079 --DataPath=One --Name=One"
  • start .\TailFeather.exe "--port=9078 --DataPath=Two --Name=Two"
  • start .\TailFeather.exe "--port=9077 --DataPath=Three --Name=Three"

We have all three nodes up and running, so now is the time to actually make use of it:

http://localhost:9079/tailfeather/key-val/set?key=ravendb&val={ 'Url': 'http://live-test.ravendb.net', 'Database': 'Sample' }

In this case, you can see that we are setting a configuration value to point to a RavenDB database on the first node. Note that at this point, we have a single node cluster, and the two other are waiting to join it, but are taking no action.

We can get the value back using:

image

So far, so good. Now, let us add a second node in by inviting it to fly with our cluster. We do that using the following command:

http://localhost:9079/tailfeather/admin/fly-with-us?url=http://localhost:9078&name=Two

Which will give us:

image

Note that we are using the just added node for too look at this.

Next, we can add the third node.

http://localhost:9079/tailfeather/admin/fly-with-us?url=http://localhost:9077&name=Three

I would put the image in, but I think you get the point.

This is it for now. We have a highly available persistent & distributed key/value store. Next, we need to tackle the idea of snapshots and the client API, but I’ll deal with that at another post.

Published at

Originally posted at

Introducing Rachis: Raven’s Raft implementation

Rachis, def: Spinal column, also the distal part of the shaft of a feather that bears the web.

Rachis is the name we picked for RavenDB’s Raft implementation. Raft is a consensus protocol that can actually be understood without resorting to Greek philosophy. You can read all about it in here (there is a very cool interactive visualization there). I would also like to thank Diego Ongaro for both the Raft paper and a lot of help while I tried to understand the finer points of it.

Why Raft?

Raft is a distributed consensus protocol. It allows you to reach an order set of operations across your entire cluster. This means that you can apply a set of operations on a state machine, and have the same final state machine in all nodes in the cluster. It is also drastically simpler to understand than Paxos, which is the more known alternative.

What is Rachis?

Well, it is a Raft implementation. To be rather more exact, it is a Raft implementation with the following features:

  • (Obviously) the ability to manage a distributed set of state machine and reliability commits updates to said state machines.
  • Dynamic topology (nodes can join and leave the cluster on the fly, including state sync).
  • Large state machines (snapshots, efficient transfers, non voting members).
  • ACID local log using Voron.
  • Support for in memory and persistent state machines.
  • Support for voting & non voting members.
  • A lot of small tweaks for best behavior in strange situations (forced step down, leader timeout and many more).

What are you going to use this for?

To reach a consensus, of course Smile. More to the point, we got some really nice idea where this is going to allow us to do some really nice stuff. In particular, we want to use that as the backbone for the event and time series replication models.

But I’m getting ahead of myself. Before we do that, I want to build a simple reliable distributed service. We’ll call it Tail/Feather and it will be awesome, in a weekend project kind of way. I’ll post full details about this in my next post.

Where can I look at it?

The current version is here, note that you’ll need to pull Voron as well (from the ravendb repository) to get it working.

How does this work?

You can read the Raft paper and the full thesis, of course, but there are some subtleties that I had to work through (with great help from Diego), so it is worth going into a bit more detail.

Clusters are typically composed of odd number of servers (3,5 or 7), which can communicate freely with one another. The startup process for a cluster require us to designate a single server as the seed. This is the only server that can become the cluster leader during the cluster bootstrap process. This is done to make sure that during startup, before we had the chance to tell the servers about the cluster topology, they won’t consider themselves a single node cluster and start accepting requests before we add them to the cluster.

Only the seed server will become the leader, and all the others will wait for instructions. We can then let the seed server know about the other nodes in the cluster. It will initiate a join operation which will reach to the other node, setup the appropriate cluster topology. At that point, all the other servers are on equal footing, and there is no longer any meaningful distinction between them. The notion of a seed node it only  relevant for cluster bootstrap, once that is done, all servers have the same configuration, and there isn’t any difference between them.

Dynamically adding and removing nodes from the cluster

Removing a node from the cluster is a simple process. All we need to do is to update the cluster topology, and we are done. The removed server will get a notification to let it know that is has been disconnected from the cluster, and will move itself to a passive state (note that it is okay if it doesn’t get this notification, we are just being nice about it Smile).

The process of adding a server is a bit more complex. Not only are we having to add a new node, we need to make sure that it has the same state as all other nodes in the cluster. In order to do that, we handle it in multiple stages. A node added to the cluster can be in one of three states: Voting (full member of the cluster, able to become a leader), Non Voting (just listening to what is going on, can’t be a leader), Promotable (getting up to speed with the cluster state). Non voting members are a unique case, they are there to enable some advance scenarios (cross data center communication, as we currently envision it).

Promotable is a lot more interesting. Adding a node to an existing cluster can be a long process, especially if we are managing a lot of data. In order to handle that, we adding a server to the promotable category, in which case we are starting to send it the state it needs to catch up with the rest of the cluster. Once it has caught up with the cluster (it has all the committed entries in the cluster), we will automatically move it to the voting members in the cluster.

Note that it is fine for crashes to happen throughout this process. The cluster leader can crash during this, and we’ll recover and handle this properly.

Normal operations

During normal operations, there is going to be a leader that is going to be accepting all the requests for the cluster, and handle committing them cluster wide. During those operations, you can spread reads across members in the cluster, for better performance.

Now, if you don’t mind, I’m going to be writing Tail/Feather now, and see how long it takes.

Transactions are a figment of your imagination

This post is in response for a few comments here. In particular, I get the sense that people expect businesses and systems to behave as if transactions are a real thing. The only problem here is that this is quite far from the truth.

Let me define what I mean by a transaction here. I’m not talking about database transactions, ACID or any such thing. I’m talking about the notion that any interaction between a user and a business, or between two businesses can actually be modeled with the notion of a transaction similar to what we see in databases.

That is, that we have an interaction that would either be all there, or won’t be there at all. The most obvious example is the notion of financial transaction, the notion that we debit an account and then we credit another account. And we have to do that in such a way that either both accounts were modified or none of them were modified. That is the classic example for database transactions, and it is wrong. As anyone who ever wrote a check or sent an wire money transfer can tell. A good discussion on how that actually works can be found here. Note that in this case, the way money transfer works, in the real world, is that you upload a file over FTP, then wait three to five days to see if the orders your sent were rejected.

Another example is the notion of accepting an order, in a transactional manner. If I accepted your order, after verifying that I have reserved everything, and my warehouse burned down, what do I do with your order? I can hardly roll it back.

To move away from businesses, let us consider doing something as unimportant as voting in a national election. Logically speaking, this is a pretty simple process. Identify the voter, record their vote, total the votes, select winner. Except that you can go back and force a re-election in a particular district if such is needed, or you might find a box of lost votes, or any of a hundred evil little things that crop up in the real world.

Any attempt to model such systems in neat transactional boxes with “all in or none at all” is going to break.

Lies, Service Level Agreements, Trust and failure mores

I had a very interesting discussion with Kelly Sommers in twitter. But 140 characters isn’t really enough to explain things. Also, it is interesting topic in general.

Kelly disagreed with this post: http://www.shopify.ca/technology/14909841-kafka-producer-pipeline-for-ruby-on-rails

image

You can read the full discussion here.

The basic premise is, there is a highly reliable distributed queue that is used to process messages, but because they didn’t have operational experience with this, they used a local queue to store the messages sending them over the network. Kelly seems to think that this is decreasing reliability. I disagree.

The underlying premise is simple, when do you consider it acceptable to lose a message. If returning an error to the client is fine, sure, go ahead and do that if you can’t reach the cluster. But if you are currently processing a 10 million dollar order, that is going to kinda suck, and anything that you can do to reduce the chance of that happening is good. Note that key part in this statement, we can only reduce the chance of this happening, we can’t ensure it.

One way to try that is to get a guaranteed SLA from the distributed queue. Once we have that, we can rely that it works. This seems to be what Kelly is aiming at:

image

And that is true, if you could rely on SLAs. Just this week we had a multi hour, multi region Azure outage. In fact, outages, include outages that violate SLAs are unfortunately common.

In fact, if we look at recent history, we have:

There are actually more of them, but I think that 5 outages in 2 years is enough to show a pattern.

And note specifically that I’m talking about global outages like the ones above. Most people don’t realize that complex systems operate in a constant mode of partial failure. If you ever read an accident investigative report, you’ll know that there is almost never just a single cause of failure. For example, the road was slippery and the driver was speeding and the ABS system failed and the road barrier foundation rotted since being installed. Even a single change in one of those would mitigate the accident from a fatal crash to didn’t happen to a “honey, we need a new car”.

You can try to rely on the distribute queue in this case, because it has an SLA. And Toyota also promises that your car won’t suddenly accelerate into a wall, but if you had a Toyota Camry in 2010… well, you know…

From my point of view, saving the data locally before sending over the network makes a lot of sense. In general, the local state of the machine is much more stable than than the network. And if there is an internal failure in the machine, it is usually too hosed to do anything about anyway. I might try to write to disk, and write to the network even if I can’t do that ,because I want to do my utmost to not lose the message.

Now, let us consider the possible failure scenarios. I’m starting all of them with the notion that I just got a message for a 10 million dollars order, and I need to send it to the backend for processing.

  1. We can’t communicate with the distributed queue. That can be because it is down, hopefully that isn’t the case, but from our point of view, if our node became split from the network, this has the same effect. We are writing this down to disk, so when we become available again, we’ll be able to forward the stored message to the distributed queue.
  2. We can’t communicate with the disk, maybe it is full, or there is an error, or something like that .We can still talk to the network, so we place it in the distributed queue, and we go on with our lives.
  3. We can’t communicate with the disk, we can’t communicate with the network. We can’t keep it in memory (we might overflow the memory), and anyway, if we are out of disk and network, we are probably going to be rebooted soon anyway. SOL, there is nothing else we can do at this point.

Note that the first case assumes that we actually do come back up. If the admin just blew this node away, then the data on that node isn’t coming back, obviously. But since the admin knows that we are storing things locally, s/he will at least try to restore the data from that machine.

We are safer (not safe, but more safe than without it). The question is whatever this is worth it? If your messages aren’t actually carrying financial information, you can probably just drop a few messages as long as you let the end user know about that, so they can retry. If you really care about each individual message, if it is important enough to go the extra few miles for it, then the store and forward model gives you a measurable level of extra safety.

Large scale distributed consensus approaches: Concurrent consistent decisions

So far we tackled the idea of large compute cluster, and a large storage cluster. I mentioned that the problem with the large storage cluster is that it doesn’t handle consistency within itself. Two concurrent requests can hit two storage nodes and make concurrent operations that aren’t synchronized between themselves. That is usually a good thing, since that is what you want for high throughput system. The less coordination you can get away with, the more you can actually do.

So far, so good, but that isn’t always suitable. Let us consider a case where we need to have a consistent approach, for some business reason. The typical example would be transactions in a bank, but I hate this example, because in the real world banks deal with inconsistency all the time, this is an explicit part of their business model. Let us talk about auctions and bids, instead. We have an auction service, which allow us to run a large number of auctions.

For each auction, users can place bids, and it is important for us that bids are always processed sequentially per auction, because we have to know who place a bid that is immediately rejected ($1 commission) or a wining bid that was later overbid (no commission except for the actual winner). We’ll leave aside the fact that this is something that we can absolutely figure out from the auction history and say that we need to have it immediate and consistent. How do we go about doing this?

Remember, we have enough load on the system that we are running a cluster with a hundred nodes in it. The rough topology is still this:

image

We have the consensus cluster, which decide on the network topology. In other words, it decide which set of servers is responsible for which auction. What happens next is where it gets interesting.

Instead of just a set of cooperating nodes that share the data between them and each of which can accept both reads and writes, we are going to twist things a bit. Each set of servers is their own consensus cluster for that particular auction. In other words, we first go to the root consensus cluster to get the topology information, then we add another command to the auction’s log. That command go through the same distributed consensus algorithm between the three nodes. The overall cluster is composed of many consensus clusters for each auction.

This means that we have a fully consistent set of operations across the entire cluster, even in the presence of failure. Which is quite nice. The problem here is that you have to have a good way to distinguish between the different consensuses. In this case, an auction is the key per consensus, but it isn’t always so each to make such distinction, and it is important that an auction cannot grow large enough to overwhelm the set of servers that it is actually using. In those cases, you can’t really do much beyond relax the constraints and go in a different manner.

For optimization purposes, you usually don’t run an independent consensus for each of the auctions. Or rather, you do, but you make sure that you’ll share the same communication resources, so for auctions/123 the nodes are D,E,U with E being the leader, while for auctions/321 the nodes are also D,E,U but U is the leader. This gives you the ability to spread processing power among the cluster, and the communication channels (TCP connections, for example) are shared between both auctions consensuses. 

Tags:

Published at

Originally posted at

Comments (3)

Large scale distributed consensus approaches: Large data sets

In my previous post, I talked about how we can design a large cluster for compute bound operations. The nice thing about this is that is that the actual amount of shared data that you need is pretty small, and you can just distribute that information among your nodes, then let them do stateless computation on that, and you are done.

A much more common scenario is when can’t just do stateless operations, but need to keep track of what is actually going on. The typical example is a set of users changing data. For example, let us say that we want to keep track of the pages each user visit on our site. (Yes, that is a pretty classic Big Table scenario, I’ll ignore the prior art issue for now). How would we design such a system?

Well, we still have the same considerations. We don’t want a single point of failures, and we want to have very large number of machines and make the most of their resources.

In this case, we are merely going to change the way we look at the data. We still have the following topology:

image

There is the consensus cluster, which is responsible for cluster wide immediately consistent operations. And there are all the other nodes, which actually handle processing requests and keeping the data.

What kind of decisions do we get to make in the consensus cluster? Those would be:

  • Adding & removing nodes from the entire cluster.
  • Changing the distribution of the data in the cluster.

In other words, the state that the consensus cluster is responsible for is the entire cluster topology. When a request comes in, the cluster topology is used to decide into which set of nodes to direct it to.

Typically in such systems, we want to keep the data on three separate nodes, so we get a request, then route it to one of those three nodes that match this. This is done by sharding the data according the the actual user id whose page views we are trying to track.

Distributing the sharding configuration is done as described in the compute cluster example, and the actual handling of requests, or sending the data between the sharded instances is handled by the cluster nodes directly.

Note that in this scenario, you cannot ensure any kind of safety. Two requests for the same user might hit different nodes, and do separate operations without being able to consider the concurrent operation. Usually, that is a good thing, but that isn’t always the case. But that is an issue of the next post.

Tags:

Published at

Originally posted at

Comments (2)

Large scale distributed consensus approaches: Computing with a hundred node cluster

I’m using 100/99 node cluster as the example, but the discussion also apply for smaller clusters (dozens of nodes) and bigger clusters (hundreds or thousands). Pretty much the only reason that you want to go with clusters of that size is that you want to scale out your processing in some manner. I’ve already discussed why a hundred node cluster isn’t a good option for safety reasons.

Consensus algorithm create a single consensus in the entire cluster, usually about an order set of operations that are fed to a state machine. The easiest such example would be a dictionary. But it make no sense to have a single dictionary spread across hundred nodes. Why would you need to do that?  How would it give you the ability to make full use of all of the power of all those nodes?

Usually nodes are used for either computing or storage purposes. Computing is much easier, so let us take that as a good example. A route calculating system, need to do a lot of computations on a relatively small amount of information (the map data). Whenever there is a change in the map (route blocked, new road open, etc), it needs to send the information to all the servers, and make sure that it isn’t lost.

Since calculating routes is expensive (we’ll ignore the options for optimizations and caching for now), we want to scale it to many nodes. And since the source data is relatively small, each node can have a full copy of the data. Under this scenario, the actual problem we have to solve is how to ensure that once we save something to the cluster, it is propagated to the entire cluster.

The obvious way to do this is with a hierarchy:

image

Basically, the big icons are the top cluster, each of which is responsible for updating a set of secondary servers, which is then responsible for updating the tertiary servers.

To be perfectly honest, this looks nice, and even reasonable, but it is going to cause a lot of issues. Sure, the top cluster is resilient to failures, but relying on a node to be up to notify other nodes isn’t so smart. If one of the nodes in the top cluster goes down, then we have about 20% of our cluster that didn’t get the notice, which kind of sucks.

A better approach would be to go with a management system and a gossip background:

image

In other words, the actual decisions are down by the big guys (literally, in this picture). This is a standard consensus cluster (Paxos, Raft, etc). Once a decision has been made by the cluster, we need to send it to the rest of the nodes in the system. We can do that either by just sending the messages to all the nodes, or by selecting a few nodes and have them send the messages to their peers. The protocol for that is something like: “What is the like command id you have? Here is what I have after that.” Assuming that each processing node is connected to a few other servers, that means that we can send the information very quickly to the entire cluster. And even if there are errors, the gossiping server will correct it (note that there is an absolute order of the commands, ensured by the consensus cluster, so there isn’t an issue about agreeing to this, just distributing the data).

Usually the gossip topology follows the actual physical distribution. So the consensus cluster will notify a couple of servers on each rack, and let the servers in the rack gossip among themselves about the new value.

This means that once we send a command to the cluster, the consensus would agree on that, then we would distribute it to the rest of the nodes. There is a gap between the consensus confirming it and the actual distributing to all the nodes, but that is expected in any distributed system. If it is important to sync this on a synchronized basis across the entire cluster, the command is usually time activated (which require clock sync, but that is something that we can blame on the ops team, so we don’t care Smile).

With this system, we can have an eventually consistent set of data across the entire cluster, and we are happy.

Of course, this is something that is only relevant for compute clusters, the kind of things were you compute a result, return it to the client and that is about it. There are other types of clusters, but I’ll talk about them in my next post.

Tags:

Published at

Originally posted at

Comments (6)

Large scale distributed consensus approaches: Calculating a way out

The question cross my desk, and it was interesting enough that I felt it deserves a post. The underlying scenario is this. We have distributed consensus protocols that are built to make sure that we can properly arrive at a decision and have the entire cluster follow it, regardless of failure. Those are things like Paxos or Raft. The problem is that those protocols are all aimed at relatively small number of nodes. Typically 3 – 5. What happens if we need to manage a large number of machines?

Let us assume that we have a cluster of 99 machines. What would happen under this scenario? Well, all consensus algorithm works on top of the notion of a quorum. That at least (N/2+1) machines have the same data. For a 3 nodes cluster, that means that any decision that is on 2 machines is committed, and for a 5 nodes cluster, it means that any decision that is on 3 machines is committed. What about 99 nodes? Well, a decision would have to be on 50 machines to be committed.

That means making 196 requests (98 x 2) (once for the command, then for the confirmation) for each command. That… is a lot of requests. And I’m not sure that I want to see what it would look like in term of perf. So just scaling things out in this manner is out.

In fact, this is also pretty strange thing to do. The notion of distributed consensus is that you will reach agreement on a state machine. The easiest way to think about it is that you reach agreement on a set of values among all nodes. But why are you sharing those values among so many nodes? It isn’t for safety, that is for sure.

Assuming that we have a cluster of 5 nodes, with each node having 99% availability (which translates to about 3.5 days of downtime per year). The availability of all nodes in the cluster is 95%, or about 18 days a year.

But we don’t need them to all be up. We just need any three of them to be up. That means that the math is going to be much nicer for us (see here for an actual discussion of the math).

In other words, here are the availability numbers if each node has a 99% availability:

Number of nodes Quorum Availability  
3 2 99.97% ~ 2.5 hours per year
5 3 99.999% (5 nines) ~ 5 minutes per year
7 5 99.9999% (6 nines) ~ 12 seconds per year
99 50 100%  

Note that all of this is based around each node having about 3.5 days of downtime per year. If we can have availability of 99.9% (or about 9 hours a year), the availability story is:

Number of nodes Quorum Availability  
3 2 99.9997% ~ 2 minutes a year
5 3 99.999999% ( 8 nines ) ~ 30 seconds per year
7 5 100%  

So in rough terms, we can say that going to 99 node cluster isn’t a good idea. It is quite costly in terms of the number of operation require to ensure a commit, and from a safety perspective, you can get the same safety level at the drastically lower cost.

But there is now another question, what would we actually want to do with a 99 node cluster*? I’ll talk about this in my next post.

A hundred node cluster only make sense if you have machines with about 80% availability. In other words, they are down for 2.5 months every year. I don’t think that this is a scenario worth discussing.

Tags:

Published at

Originally posted at

Modeling exercise: The grocery store’s checkout model process approach

I posted about the grocery store checkout process exercise before. Now I want to see if I can do a short outline on how I would handle this.

The key aspect from my perspective is that we need to separate the notion of the data we have and the processing of the data. That means that we are going to have the following model:

public class ShoppingCart
{
   public List<ProductInShoppingCart> Products {get;set;}
   public List<Discount> Discounts { get;set; }
}

public class ProductInShoppingCart
{
   public string ProductId;
   public Discount Discount;
}

Note that we explicitly do not have a quantity field here. If we purchase 6 bottles of milk, that would appear three times in the cart. Why is that?

Let us assume that we have a sale for 2 bottles of milk for 20% discount or a 3 +1 bottles of milk offer. Consider the kind of code you would have to write in the offer code:

  • Find all products that have this offer and have 4 items without discount.
  • Add the discount to those products.
  • After searching for products without discount, need to search for products with a discount, but that we can apply this to and get a better option.

In this case, we start by doing:

  • Add bottle of milk
  • Add bottle of milk – 2 for 20% discount is triggered.
  • Add bottle of milk
  • Add bottle of milk – 3+1 offer is triggered, removing the previous discount.

Because this is likely going to be complex, I’m going to be writing this once. A set of offers and the kind of rules that we want. Then we will give the users the ability to define those rules.

Note that we keep the raw data (products) and the transformations (discounts) separate, so we can always reapply everything without losing any data.

Modeling exercise: The grocery store’s checkout model

I went to the super market yesterday, and I forgot to get out of work mode, so here is this posts. imageThe grocery store checkout model exercise deals with the following scenario. You have a customer that is scanning products in a self checkout lane, and you need to process the order.

In terms of external environment, you have:

  • ProductScanned ( ProductId: string ) event
  • Complete Order command
  • Products ( Product Id –> Name, Price ) dataset

So far, this is easy, however, you also need to take into account:

  • Sales (1+1, 2+1, 5% off for store brands, 10% off for store brands for loyalty card holders).
  • Purchase of items by weight (apples, bananas, etc).
  • Per customer discount for 5 items.
  • Rules such as alcohol can only be purchased after store clerk authorization.
  • Purchase limits (can only purchase up to 6 items of the same type, except for specific common products)

The nice thing about such an exercise is that it forces you to see how many things you have to juggle for such a seemingly simple scenario.

A result of this would be to see how you would handle relatively complex rules. Given the number of rules we already have, it should be obvious that there are going to be more, and that they are going to be changing on a fairly frequent basis. A better model would be to actually do this over time. So you start with just getting the first part, then you start streaming the other requirements, but what you actually see is the changes in the code over time. So each new requirement causes you to make modifications and accommodate the new behavior.

The end result might be a Git repository that allows you to see the full approach that was used and how it changed over time. Ideally, you should see a lot of churn in the beginning, but then you’ll have a lot less work to do as your architecture settles down.

On site Architecture & RavenDB consulting availabilities: Malmo & New York City

I’m going to have availability for on site consulting in Malmo, Sweden  (17 Sep) and in New York City, NY (end of Sep – beginning of Oct).

If you want me to come by and discuss what you are doing (architecture, nhibernate or ravendb), please drop me a line.

I’m especially interested in people who need to do “strange” things with data and data access. We are building a set of tailored database solutions for customers now, and we have seem customers show x750 improvement in performance when we gave them a database that was designed to fit their exact needs, instead of having to contort their application and their database to a seven dimensional loop just to try to store and read what they needed.

The fallacy of distributed transactions

This can be a very short post, just: See CAP. Unfortunately, we have a lot of people who actually have experience in using distributed transactions, and have a good reason to believe that they work. The answer is that yes, they do, as long as you don’t run into some of the interesting edge cases.

By the way, that is not a new observation, see The Two Generals.

Allow me to demonstrate. Assume that we have a distributed system with the following actors:

image

This is a fairly typical setup. You have a worker that pull messages from a queue and read/write to a database based on those messages. To coordinate between them, it uses a transaction coordinator such as MSDTC.

Transaction coordinators use a two phase commit (or sometimes a three phase commit protocols) to ensure that either all the data would be committed, or none of it would be.

The general logics goes like this:

  • For each participant in the transaction, send a prepare message.
    • If the participant answered “prepared”, it is guaranteeing that the transaction can be committed.
  • If any of the participants failed on prepare, abort the whole transaction.
  • If all of the participants successfully prepared, send the commit message to all of them.

The actual details are a bit more involved, obviously, but that is pretty much it.

Now, let us take a look at an interesting scenario. Worker #1 is pulling (in a distributed transaction) a message from the queue, and based on that message, it modify the database. Then it tells the transaction coordinator that it can commit the transaction. At this point, the TC is sending the prepare message to the database and the queue. Both reply that they have successfully prepared the transaction to be committed. The TC sends a commit message to the queue, completing the transaction from its point of view, however, at this point, it is unable to communicate with the database.

What happens now?

Before I answer that, let us look at another such scenario. The TC needs to commit a transaction, it sends a prepare message to the database, and receive a successful reply. However, before it manages to send a prepare message to the queue, it becomes unavailable.

Note that from the point of view of the database, the situation looks exactly the same. It got (and successfully replied) to a Prepare message, then it didn’t hear back from the transaction coordinator afterward.

Now, that is where it gets interesting. In an abstract world, we can just wait with the pending transaction until the connection with the coordinator is resumed, and we can actually get a “commit / abort” notification.

But we aren’t in abstract world. When we have such a scenario, we are actually locking records in the database (because they are in the process of being modified). What happens when another client comes to us and want to modify the same record?

For example, it is quite common for to host the business logic, queue and transaction coordinator on the same worker instance, while the database is on a separate machine. That means that in the image above, if Worker #1 isn’t available, we recover by directing all the users to the 2nd worker. However, at that point, we have a transaction that was prepared, but not committed.

When the user continue to make requests to our system, the 2nd worker, which has its own queue and transaction coordinator is going to try and access the user’s record. The same user whose record are currently locked because of the ongoing transaction.

If we just let it hang in this manner, we have essentially created a situation where the user’s data become utterly unavailable (at least for writes). In order to resolve that, transactions comes with a timeout. So after the timeout has expired, we can roll back that transaction. Of course, that leads to a very interesting situation.

Let us go back to the first scenario we explored. In this scenario, the queue got both Prepare & Commit messages, while the database got just a Prepare message. The timeout has expired, and the database has rolled back the transaction.  In other words, as far as the queue is concerned, the transaction committed, and the message is gone. As far as the database is concerned, that transaction was rolled back, and never happened.

Of course, the chance that something like that can happen in one of your systems? Probably one in a million.

Message passing, performance–take 2

In my previous post, I did some rough “benchmarks” to see how message passing options behave. I got some great comments, and I thought I’ll expand on that.

The baseline for this was a blocking queue, and we managed to process using that we managed to get:

145,271,000 msgs in 00:00:10.4597977 for 13,888,510 ops/sec

And the async BufferBlock, using which we got:

43,268,149 msgs in 00:00:10 for 4,326,815 ops/sec.

Using LMAX Disruptor we got a disappointing:

29,791,996 msgs in 00:00:10.0003334 for 2,979,100 ops/sec

However, it was pointed out that I can significantly improve this if I changed the code to be:

var disruptor = new Disruptor.Dsl.Disruptor<Holder>(() => new Holder(), new SingleThreadedClaimStrategy(256), new YieldingWaitStrategy(), TaskScheduler.Default);

After which we get a very nice:
141,501,999 msgs in 00:00:10.0000051 for 14,150,193 ops/sec
Another request I got was for testing this with a concurrent queue, which is actually what it is meant to do. The code is actually the same as the blocking queue, we just changed Bus<string> to ConcurrentQueue<string>.
 
Using that, we got:
170,726,000 msgs in 00:00:10.0000042 for 17,072,593 ops/sec
And yes, this is pretty much just because I could. Any of those methods is quite significantly higher than anything close to what I actually need.

Interview challenges: Decompress that

I’m always looking for additional challenges that I can ask people who interview at Hibernating Rhinos, and I run into an interesting challenge just now.

Implement a Decompress(Stream input, Stream output, byte[][] dictionary) routine for the following protocol:

Prefix byte – composed of the highest bit + 7 bits len

If the highest bit is not set, this is a uncompressed data, copy the next len from the input to the output.

If the highest bit is set, this is compressed data, the next byte will be the index in the dictionary, copy len bytes from that dictionary entry to the output.

After writing the code, the next question is going to be, what are the implication of this protocol? What is it going to be good for? What is it going to be bad for?

Your customer isn’t a single entity

An interesting issue came up in the comments for my modeling post.  Urmo is saying:

…there are no defined processes, just individual habits (even among people with same set of obligations) with loose coupling on the points where people need to interact. In these companies a software can be a boot that kicks them into more defined and organized operating mode.

This is part of discussion of software modeling and the kind of thinking you have to do when you approach a system. The problem with Urmo’s approach is that there is a set implicit assumptions, and that is that the customer is speaking with a single voice, that they actually know what they are doing and that they have the best interests. Yes, it is really hard to create software (or anything, actually) without those, but that happens more frequently than one might desire.

A few years ago I was working on a software to manage what was essentially long term temp workers. Long term could be 20 years, and frequently was a number of years. The area in question was caring for invalids,  and most of the customers for that company were the elderly. That meant that a worker might not be required on a pretty sudden basis (the end customer died, care no longer required).

Anyway, that is the back story. The actual problem we run into was that by the time the development team got into place there was already a very detailed spec, written by a pretty good analyst after many sessions at a luxury hotel conference room. In other words, the spec cost a lot of money to generate, and involved a lot of people from the company’s management.

What it did not include, however, was feedback from the actual people who had to place the workers at particular people’s homes, and eventually pay them for their work. Little things like the 1st of the month (you have 100s of workers coming in to get their hours approved and get paid) weren’t taken into account. The software was very focused on the individual process, and there were a lot of checks to validate input.

What wasn’t there were things like: “How do I efficiently handle many applicants at the same time?’'

The current process was paper form based, and they were basically going over the hours submitted, ask minimal questions, and provisionally approve it. Later on, they would do a more detailed scan of the hours, and do any fixups needed. That would be the time that they would also input the data to their old software. In other words, there was an entire messy process going on that the higher ups didn’t even realize was happening.

This include decisions such as “you need an advance, we’ll register that as 10 extra hours you worked this month, and we’ll deduct it next month” and “you weren’t supposed to go to Mrs. Xyz, you were supposed to go to Mr. Zabc! We can’t pay for all your hours there” , etc.

When we started working on the software, we happened to do a demo to some of the on site people, and they were horrified by what they saw. The new & improved software would end up causing them much more issues, and it would actually result in more paperwork that they have to manage just so they can make the software happy.

Modeling such things was tough, and at some point (with the client reluctant agreement) we essentially threw aside the hundreds of pages of well written spec, and just worked directly with the people who would end up using our software. The solution in the end was to codify many of the actual “business processes” that they were using. Those business processes made sense, and they were what kept the company working for decades. But management didn’t actually realize that they were working in this manner.

And that is leaving aside the “let us change the corporate structure through software” endeavors, which are unfortunately also pretty common.

To summarize, assuming that your client is a single entity, which speaks with one voice and actually know what they are talking about? Not going to fly for very long. In another case, I had to literally walk a VP of Sales through the process of how a sale is actually happening in his company versus what he thought was happening.

Sometimes this job is likely playing a shrink, but for corporations.

Modeling exercise: Flights & Travelers

I just got a really interesting customer inquiry, and I got their approval to share it. The basic problem is booking flights, and how to handle that.

The customer suggested something like the following:

{   //customers/12345
    "Name" : "John Doe",
    "Bookings" : [{
        "FlightId": "flights/1234",
        "BookingId": "1asifyupi",
        "Flight": "EA-4814",
        "From": "Iceland",
        "To" : "Japan", 
        "DateBooked" : "2012/1/1"
      }]
    }    
}

{ // flight/1234
   "PlaneId": "planes/1234"// centralized miles flown, service history
   "Seats": 
   {
       {
           "Seat": "F16"
           "BookedBy": "1asifyupi"
   }
}

But that is probably a… suboptimal way to handle this. Let us go over the type of entities that we have here:

  • Customers / Passengers
  • Flights
  • Planes
  • Booking

The key point in here is that each of those is pretty independent. Note that for simplicity’s sake, I’m assuming that the customer is also the passenger (not true in many cases, a company may pay for your flight, so you the company in the customer and you the passenger).

The actual problem the customer is dealing with is that they have thousands of flights, tens or hundreds of thousands of seats and millions of customers competing for those seats.

Let us see if we can breaking it down to a model that can work for this scenario.  Customers deserve its own document, but I wouldn’t store the bookings directly in the customer document. There are many customers that fly a lot, and they are going to have a lot of booking there. At the same time, there are many bookings that are made for a lot of people at the same time (an entire family flying).

That leaves the Customer’s document with data about the customer (name, email, phone, passport #, etc) as well as details such as # of miles traveled, the frequent flyer status, etc.

Now, we have the notion of flights and bookings. A flight is a (from, to, time, plane), which contains the available seats number. Note that we need to explicitly allow for over booking, since that is a common practice for airlines.

There are several places were we have contention here:

  • When ordering, we want to over book up to a certain limit.
  • When seating (usually 24 – 48 hours before the flight) we want to reserve seats.

The good thing about it is that we actually have a relatively small contention on a particular flight. And the way the airline industry works, we don’t actually need a transaction between creating the booking and taking a seat on the flight.

The usual workflows goes something like this:

  • A “reservation” is made for a particular itinerary.
  • That itinerary is held for 24 – 48 hours.
  • That itinerary is sent to the customer for approval.
  • Customer approve and a booking is made, flight reservations are turned into actual booked seats.

The good thing about this is that because a flight can have up to ~600 seats in it, we don’t really have to worry about contention on a single flight. We can just use normal optimistic concurrency and avoid more complex models. That means that we can just retry on concurrency errors and see where that leads us. The breaking of the actual order into reservation and booking also helps, since we don’t have to coordinate between the actual charge and the reservation on the flight.

Overbooking is handled by setting a limit of how much we allow overbooking, and managing the number of booked seats vs. reserved seats. When we look at the customer data, we show the customer document, along with the most recent orders and the stats. When we look at a particular flight, we can get pretty much all of its state from the flight document itself.

And the plane’s stats are usually just handled via a map/reduce index on all the flights for that plane.

Now, in the real world, the situation is a bit more complex. We might give out 10 economy seats and 3 business seats to Expedia for a 2 months period, so they manage that, and we have partnership agreements with other airlines, and… but I think that this is a good foundation to start building this on.

The contracts of Lazy vs Task

There was a question about our use of Task<T> and Lazy<T> in RavenDB, and I thought that this is a subtle thing that deserve more than a short email.

The basic difference between Lazy<T> and Task<T> are the kind of contracts that they express.

  • Lazy<T> represent a potentially expensive operation that has been deferred. The promise given is that the lazy’s value will be generated the first time that it is needed.
  • Task<T> represent a potentially expensive operation that is currently executing. The promise is that the task’s value will be available on request, hopefully already there by the time you asked.

The major difference is when we are actually starting the operation. Most often, when we return a task, we return a reference to an scheduled / executing task, which will complete whatever or not the task’s result will be accessed. Conversely, a lazy’s whose value was never accessed is not something that will ever execute.

The use cases for them tend to be quite different.

Lazy<T> is used a lot of the time as a way to handle once and only once creation (basically, a safe singleton pattern), and for actually deferring the execution of work. Sometimes you can avoid having to do something, and it is easier to give a caller a lazy object and only have to pay for that additional work if they access the data.

Task<T> is primarily used to actually parallelize the work, so you’ll have it running while you are doing other stuff. Usually this is used for I/O, since that can be natively parallelized.

With RavenDB, we use Task<T> as the return type of all our async operations, allowing of to take advantage on the async nature of I/O. And we use Lazy<T> to setup a deferred call for the server. When someone actually access one of lazy’s values, we have to provide you with the answer. At this point, we can go to the server with all  the pending lazy operations, and save a lot of effort just making remote calls to the server.

In fact, in RavenDB 3.0, we have lazy support in the async API. That means that we have methods that return Lazy<Task<T>>, which means: Give me a deferred operation, that when required, will perform in an async manner. That gives me both the ability to combine requests to the server and avoid blocking up a thread while that I/O is in progress.

Corax: The problem of space

Corax is a research project that we have, to see how we can build a full text search library on top of Voron. Along the way, we take the chance to find out how Lucene does things, and what we can do better. Pretty much from the get go, Corax is likely to use more disk space than Lucene, probably significantly so. I would be happy if we could get a merely 50% increase over Lucene

The reason that this is the case is that Lucene goes to great length to save disk space. From storing all integers in variable length format, to prefix compression to implicitly referencing data in other files. For example, you can see that when you try reading term positions:

TermPositions are ordered by term (the term is implicit, from the .tis file).

Positions entries are ordered by increasing document number (the document number is implicit from the .frq file).

The downside of saving every little bit is that it is a lot more complex to read the data, requiring multiple caches and complex code path to actually get it properly. It make a lot of sense, when Lucene was created, disk space was at a premium. I won’t go as far as to say that disk space doesn’t matter, but given a trade off of using more disk space vs. using more memory / complexity, it is much easier to justify disk space usage today*.

* The caveat here is that you need to be careful, because just accessing the disk can be very slow.

One of the major things that we wanted to deal with Corax is reducing index corruption issues, and seeing if we can simplify things into a transactional system. As a side effect of that, we don’t need to have index segments, and we don’t need to do merges to free disk space. The problem is that in order to handle this, we need to make track additional information that Lucene doesn’t need to.

Let us look at the actual data we keep. Here is a very simple index:

using (var fullTextIndex = new FullTextIndex(StorageEnvironmentOptions.CreateMemoryOnly(), new DefaultAnalyzer()))
{
using (var indexer = fullTextIndex.CreateIndexer())
{
indexer.NewIndexEntry();
indexer.AddField("Name", "Oren Eini");
indexer.AddField("Email","Ayende@ayende.com");

indexer.NewIndexEntry();
indexer.AddField("Name", "Arava Eini");
indexer.AddField("Email","Arava@houseof.dog");

indexer.Flush();
}
}

For each field, we are going to create a multi tree. And for each unique term in the field we have a list of (Index Entry Id, term frequency, boost).

  • @fld_Name
    • arava
      • { 2, 1, 1.0 }  (index id 2, freq 1, boost 1.0)
    • eini
      • { 1,1,1.0 }
      • { 2,1,1.0 }
    • oren
      • { 1,1,1 }
  • @fld_Email
    • arava@houseof.dog
      • { 2,1,1.0 }
    • ayende@ayende.com
      • { 1,1,1.0 }

This is pretty much the equivalent to the way Lucene store things. Possible space optimizations here include not storing default values (term frex or boost of 1), storing index entry ids as variable ints, etc.

The problem is that while this is actually enough for the way Lucene does things, it is not enough for the way Corax does things. Let us consider the case of deleting a document. How would you go about doing this using the information above?

Lucene does this by marking a document id as deleted, and will purge its details on the next segments merge. That works, but only because a segment merge actually read & write all of the relevant segments data. Without a segments merge, deleting a document is actually something that would require us to scan all the data in the entire database. This is not really practical. Therefor, we need to store additional data so we can delete it later on. In this case, we have the Docs tree, which has keys for (index entry id, field id and term num). This looks like this:

  • Docs
    • [1,1,1]: oren
    • [1,1,2]: eini
    • [1,2,1]: ayende@ayende.com
    • [2,1,1]: arava
    • [2,1,2]: eini
    • [2,2,1]: arava@houseof.dog

Using this information, we can now remove all traces of a document when it is deleted. However, the problem here is that we need to also keep the terms per document in the index. That really blow up the index size, obviously.

The reason for this peculiar way of storing the document fields in this manner is that we also want to reuse this information for sorting. When Lucene needs to sort data, it has to read all of the data from the fields, then recreate the values for all relevant documents. Corax can just serve the data already there.

A pretty obvious step to save space would be to track the terms separately, and use an id in the Docs tree, not the full term. That leads to an interesting problem, because we are going to need to be able to go from term –> id and id –> term, which pretty much require storing them twice, unfortunately.

Final note, Corax is a research project.

Success: From Opening a Champagne Bottle To Hiding Under the Bed with Said Bottle

During the RavenDB courses* in the past few weeks, I was talking with one of the attendees and I came up with what I think is a great analogy.

* Yes, courses, in the past 4 weeks, I’ve given the RavenDB course 3 times. That probably explains why I don’t remember which course it was.

What are your success metrics? From Opening a Champagne Bottle To Hiding Under the Bed with Said Bottle?

The first success  metric is when you have enough users (and, presumably, revenue) to cross the threshold to the Big Boys League. Let us call this the 25,000 users range.  That is the moment when you throw a party, go to the store and grab a whole case of champagne bottles and make fancy speeches. Of course, the problem with success is that you can have too much of it. A system that does just fine (maybe creeks a little ) on a 25,000 users is going to behave pretty differently when you have 100,000 users. That is the moment when you find your engineers under the bed, with a half empty bottle of champagne and muttering things about Out Of Capacity errors and refusing to come out until we fire all the users.

In just about any system, you need to define the success points. Because Twitter was very luck that it managed to grow even though it had so many problems when its user base exploded. It is far more likely that users will figure out that your service is… well, your engineers are drunk and hiding under the bed, so the service looks accordingly.

And yes, you can try to talk to people about SLA, and metrics and capacities. But I have found that an image like that tend to give you a lot more focused answers. Even if a lot of the time the answer is “I don’t know”. That is a place to start, because this make it a lot more acute than just “how many req/sec do we need to support?”.

Tags:

Published at

Originally posted at

Comments (6)

The Corax Experiment: API

I posted before about design practice for how I would approach building a search engine library. I decided to bite the bullet and actually try to do this. Using Voron, that turned out to be a really simple thing to do. Of course, this doesn’t do a tenth of what Lucene does, but it actually does quite a lot. The code is available here, and I want to emphasize again, this is purely experimental / research project.

The entire thing comes to less than 500 lines of code. And it is pretty functional even at this stage.

Corax is composed of:

  • Analysis
  • Indexing
  • Querying

Analysis of the documents is handled via analyzers:

   1: public interface IAnalyzer
   2: {
   3:     ITokenSource CreateTokenSource(string field, ITokenSource existing);
   4:     bool Process(string field, ITokenSource source);
   5:  
   6: }

An analyzer create a token source, which accept a TextReader and produces token. For each token, the Process method is called, and it is used to do things to the relevant token. For example, here is the default analyzer:

   1: public class DefaultAnalyzer : IAnalyzer
   2: {
   3:     readonly IFilter[] _filters =
   4:     {
   5:         new LowerCaseFilter(), 
   6:         new RemovePossesiveSuffix(), 
   7:         new StopWordsFilter(), 
   8:     };
   9:  
  10:  
  11:     public ITokenSource CreateTokenSource(string field, ITokenSource existing)
  12:     {
  13:         return existing ?? new StringTokenizer();
  14:     }
  15:  
  16:     public bool Process(string field, ITokenSource source)
  17:     {
  18:         for (int i = 0; i < _filters.Length; i++)
  19:         {
  20:             if (_filters[i].ProcessTerm(source) == false)
  21:                 return false;
  22:         }
  23:         return true;
  24:     }
  25: }

The idea here is to match, fairly closely, what Lucene is doing, but hopefully with clearer code. This analyzer will text a stream of text, break it up to discrete tokens, lower case them, remove the possessive ‘s suffix and clear stop words. Note that each of the filters are actually modifying the token in place.  And the tokenizer is pretty simple, but it does the job for now.

Now, let us move to indexing. With Lucene, the recommendation is that you’ll reuse your document and field instance, to avoid create garbage for the GC. With Corax, I took it a step further:

   1: using (var indexer = fullTextIndex.CreateIndexer())
   2: {
   3:     indexer.NewDocument();
   4:     indexer.AddField("Name", "Oren Eini");
   5:  
   6:     indexer.NewDocument();
   7:     indexer.AddField("Name", "Ayende Rahien");
   8:  
   9:     indexer.Flush();
  10: }

There are a couple of things to note here. An index can create indexers, it is intended to have multiple concurrent indexers running at the same time. Note that unlike Lucene, we don’t have Document or Field classes. Instead, we call methods on the indexer to create a new document and then add fields to the current document. When you are done with a document, you start a new one, or flush to complete the entire operation. For long running indexing, the indexer will flush itself automatically for you.

I think that this API gives us the best approach. It guide you toward using a single instance, with internal optimizations to make it memory efficient. Multiple instances can be used concurrently to speed up indexing time. And it knows when to spill flush itself for you, so you don’t have to worry about that.  Although you do have to complete the operation by calling Flush() at the end.

How about searching? That turned out to be pretty similar as well. All you have to do is create a searcher:

   1: using (var searcher = fti.CreateSearcher())
   2: {
   3:     QueryResults queryResults = searcher.QueryTop(new TermQuery("Name", "Arava"), 10);
   4:     Console.WriteLine(queryResults.TotalResults);
   5:     foreach (var match in queryResults.Results)
   6:     {
   7:         Console.WriteLine(match.DocumentId + " - " + match.Score);
   8:     }
   9: }

We create a searcher, and then we can utilize it to perform queries.

So far, this has been all about the API we have, I’ll talk about the actual implementation in my next post.