Distributed counters feature design

time to read 18 min | 3561 words

This is another experiment with longer posts.

Previously, I used the time series example as the bed on which to test some ideas regarding feature design, to explain how we work and in general work out the rough patches along the way. I should probably note that these posts are purely fiction at this point. We have no plans to include a time series feature in RavenDB at this time. I am trying to work out some thoughts in the open and get your feedback.

At any rate, yesterday we had a request for Cassandra style counters at the mailing list. And as long as I am doing feature design series, I thought that I could talk about how I would go about implementing this. Again, consider this fiction, I have no plans of implementing this at this time.

The essence of what we want is to be able to… count stuff. Efficiently, in a distributed manner, with optional support for cross data center replication.

Very roughly, the idea is to have “sub counters”, unique for every node in the system. Whenever you increment the value, we log this to our own sub counter, and then replicate it out. Whenever you read it, we just sum all the data we have from all the sub counters.

Let us outline the various parts of the solution in the same order as the one I used for time series.


A counter is just a named 64 bits signed integer. A counter name can be any string up to 128 printable characters. The external interface of the storage would look like this:

   1: public struct CounterIncrement
   2: {
   3:     public string Name;
   4:     public long Change;
   5: }
   7: public struct Counter
   8: {
   9:     public string Name;
  10:     public string Source;
  11:     public long Value;
  12: }
  14: public interface ICounterStorage
  15: {
  16:     void LocalIncrementBatch(CounterIncrement[] batch);
  18:     Counter[] Read(string name);
  20:     void ReplicatedUpdates(Counter[] updates);
  21: }

As you can see, this gives us very simple interface for the storage. We can either change the data locally (which modify our own storage) or we can get an update from a replica about its changes.

There really isn’t much more to it, to be fair. The LocalIncrementBatch() increment a local value, and Read() will return all the values for a counter. There is a little bit of trickery involved in how exactly one would store the counter values.

For now, I think we’ll store each counter as two step values. We’ll have a tree of multi tree values that will carry each value from each source. That means that a counter will take roughly 4KB or so. This is easy to work with and nicely fit the model Voron uses internally.

Note that we’ll outline additional requirement for storage (searching for counter by prefix, iterating over counters, addresses of other servers, stats, etc) below. I’m not showing them here because they aren’t the major issue yet.

Over the wire

Skipping out on any optimizations that might be required, we will expose the following endpoints:

  • GET /counters/read?id=users/1/visits&users/1/posts <—will return json response with all the relevant values (already summed up).
    { “users/1/visits”: 43, “users/1/posts”: 3 }
  • GET /counters/read?id=users/1/visits&users/1/1/posts&raw=true <—will return json response with all the relevant values, per source.
    { “users/1/visits”: {“rvn1”: 21, “rvn2”: 22 } , “users/1/posts”:  { “rvn1”: 2, “rvn3”: 1 } }
  • POST /counters/increment <– allows to increment counters. The request is a json array of the counter name and the change.

For a real system, you’ll probably need a lot more stuff, metrics, stats, etc. But this is the high level design, so this would be enough.

Note that we are skipping the high performance stream based writes we outlined for time series. We’ll probably won’t need them, so that doesn’t matter, but they are an option if we need them.

System behavior

This is where it is really not interesting, there is very little behavior here, actually. We only have to read the data from the storage, sum it up, and send it to the user. Hardly what I’ll call business logic.

Client API

The client API will probably look something like this:

   1: counters.Increment("users/1/posts");
   2: counters.Increment("users/1/visits", 4);
   4: using(var batch = counters.Batch())
   5: {
   6:     batch.Increment("users/1/posts");
   7:     batch.Increment("users/1/visits",5);
   8:     batch.Submit();
   9: }

Note that we’re offering both batch and single API. We’ll likely also want to offer a fire & forget style, which will be able to offer even better performance (because they could do batching across more than a single thread), but that is out of scope for now.

For simplicity sake, we are going to have the client just a container for all of endpoints that it knows about. The container would be responsible for… updating the client visible topology, selecting the best server to use at any given point, etc.

User interface

There isn’t much to it. Just show a list of counter values in a list. Allow to search by prefix, allow to dive into a particular counter and read its raw values, but that is about it. Oh, and allow to delete a counter.

Deleting data

Honestly, I really hate deletes. They are very expensive to handle properly the moment you have more than a single node. In this case, there is an inherent race condition between a delete going out and another node getting an increment. And then there is the issue of what happens if you had a node down when you did the delete, etc.

This just sucks. Deletion are handled normally, (with the race condition caveat, obviously), and I’ll discuss how we replicate them in a bit.

High availability / scale out

By definition, we actually don’t want to have storage replication here. Either log shipping or consensus based. We actually do want to have different values, because we are going to be modifying things independently on many servers.

That means that we need to do replication at the database level. And that leads to some interesting questions. Again, the hard part here is the deletes. Actually, the really hard part is what we are going to do with the New Server Problem.

The New Server Problem dictates how we are going to bring a new server into the cluster. If we could fix the size of the cluster, that would make things a lot easier. However, we are actually interested in being able to dynamically grow the cluster size.

Therefor, there are only two real ways to do it:

  • Add a new empty node to the cluster, and have it be filled from all the other servers.
  • Add a new node by backing up an existing node, and restoring as a new node.

RavenDB, for example, follows the first option. But it means that in needs to track a lot more information. The second option is actually a lot simpler, because we don’t need to care about keeping around old data.

However, this means that the process of bringing up a new server would now be:

  1. Update all nodes in the cluster with the new node address (node isn’t up yet, replication to it will fail and be queued).
  2. Backup an existing node and restore at the new node.
  3. Start the new node.

The order of steps is quite important. And it would be easy to get it wrong. Also, on large systems, backup & restore can take a long time. Operationally speaking, I would much rather just be able to do something like, bring a new node into the cluster in “silent” mode. That is, it would get information from all the other nodes, and I can “flip the switch” and make it visible to clients at any point in time.  That is how you do it with RavenDB, and it is an incredibly powerful system, when used properly.

That means that for all intents and purposes, we don’t do real deletes. What we’ll actually do is replace the counter value with delete marker. This turns deletes into a much simple “just another write”. It has the sad implication of not free disk space on deletes, but deletes tend to be rare, and it is usually fine to add a “purge” admin option that can be run on as needed basis.

But that brings us to an interesting issue, how do we actually handle replication.

The topology map

To simplify things, we are going to go with one way replication from a node to another. That allows complex topologies like master-master, cluster-cluster, replication chain, etc. But in the end, this is all about a single node replication to another.

The first question to ask is, are we going to replicate just our local changes, or are we going to have to replicate external changes as well? The problem with replicating external changes is that you may have the following topology:


Now, Server A got a value and sent it to Server B. Server B then forwarded it to Server C. However, at that point, we also have a the value from Server A replicated directly to Server C. Which value is it supposed to pick? And what about a scenario where you have more complex topology?

In general, because in this type of system, we can have any node accept writes, and we actually desire this to be the case, we don’t want this behavior. We want to only replicate local data, not all the data.

Of course, that leads to an annoying question, what happens if we have a 3 node cluster, and one node fails catastrophically. We can bring a new node in, and the other two nodes will be able to fill in their values via replication, but what about the node that is down? The data isn’t gone, it is still right there in the other two nodes, but we need a way to pull it out.

Therefor, I think that the best option would be to say that nodes only replicate their local state, except in the case of a new node. A new node will be told the address of an existing node in the cluster, at which point it will:

  • Register itself in all the nodes in the cluster (discoverable from the existing node). This assumes a standard two way replication link between all servers, if this isn’t the case, the operators would have the responsibility to setup the actual replication semantics on their own.
  • New node now starts getting updates from all the nodes in the cluster. It keeps them in a log for now, not doing anything yet.
  • Ask that node for a complete update of all of its current state.
  • When it has all the complete state of the existing node, it replays all of the remembered logs that it didn’t have a chance to apply yet.
  • Then it announces that it is in a valid state to start accepting client connections.

Note that this process is likely to be very sensitive to high data volumes. That is why you’ll usually want to select a backup node to read from, and that decision is an ops decision.

You’ll also want to be able to report extensively on the current status of the node, since this can take a while, and ops will be watching this very closely.

Server Name

A node requires a unique name. We can use guids, but those aren’t readable, so we can use machine name + port, but those can change. Ideally, we can require the user to set us up with a unique name. That is important for readability and for being able to alter see all the values we have in all the nodes. It is important that names are never repeated, so we’ll probably have a guid there anyway, just to be on the safe side.

Actual Replication Semantics

Since we have the New Server Problem down to an automated process, we can choose the drastically simpler model of just having an internal queue per each replication destination. Whenever we make a change, we also make a note of that in the queue for that destination, then we start an async replication process to that server, sending all of our updates there.

It is always safe to overwrite data using replication, because we are overwriting our own data, never anyone else.

And… that is about it, actually. There are probably a lot of details that I am missing / would discover if we were to actually implement this. But I think that this is a pretty good idea about what this feature is about.