Ayende @ Rahien

It's a girl

Large scale distributed consensus approaches: Concurrent consistent decisions

So far we tackled the idea of large compute cluster, and a large storage cluster. I mentioned that the problem with the large storage cluster is that it doesn’t handle consistency within itself. Two concurrent requests can hit two storage nodes and make concurrent operations that aren’t synchronized between themselves. That is usually a good thing, since that is what you want for high throughput system. The less coordination you can get away with, the more you can actually do.

So far, so good, but that isn’t always suitable. Let us consider a case where we need to have a consistent approach, for some business reason. The typical example would be transactions in a bank, but I hate this example, because in the real world banks deal with inconsistency all the time, this is an explicit part of their business model. Let us talk about auctions and bids, instead. We have an auction service, which allow us to run a large number of auctions.

For each auction, users can place bids, and it is important for us that bids are always processed sequentially per auction, because we have to know who place a bid that is immediately rejected ($1 commission) or a wining bid that was later overbid (no commission except for the actual winner). We’ll leave aside the fact that this is something that we can absolutely figure out from the auction history and say that we need to have it immediate and consistent. How do we go about doing this?

Remember, we have enough load on the system that we are running a cluster with a hundred nodes in it. The rough topology is still this:


We have the consensus cluster, which decide on the network topology. In other words, it decide which set of servers is responsible for which auction. What happens next is where it gets interesting.

Instead of just a set of cooperating nodes that share the data between them and each of which can accept both reads and writes, we are going to twist things a bit. Each set of servers is their own consensus cluster for that particular auction. In other words, we first go to the root consensus cluster to get the topology information, then we add another command to the auction’s log. That command go through the same distributed consensus algorithm between the three nodes. The overall cluster is composed of many consensus clusters for each auction.

This means that we have a fully consistent set of operations across the entire cluster, even in the presence of failure. Which is quite nice. The problem here is that you have to have a good way to distinguish between the different consensuses. In this case, an auction is the key per consensus, but it isn’t always so each to make such distinction, and it is important that an auction cannot grow large enough to overwhelm the set of servers that it is actually using. In those cases, you can’t really do much beyond relax the constraints and go in a different manner.

For optimization purposes, you usually don’t run an independent consensus for each of the auctions. Or rather, you do, but you make sure that you’ll share the same communication resources, so for auctions/123 the nodes are D,E,U with E being the leader, while for auctions/321 the nodes are also D,E,U but U is the leader. This gives you the ability to spread processing power among the cluster, and the communication channels (TCP connections, for example) are shared between both auctions consensuses. 


Published at

Originally posted at

Comments (3)

Large scale distributed consensus approaches: Large data sets

In my previous post, I talked about how we can design a large cluster for compute bound operations. The nice thing about this is that is that the actual amount of shared data that you need is pretty small, and you can just distribute that information among your nodes, then let them do stateless computation on that, and you are done.

A much more common scenario is when can’t just do stateless operations, but need to keep track of what is actually going on. The typical example is a set of users changing data. For example, let us say that we want to keep track of the pages each user visit on our site. (Yes, that is a pretty classic Big Table scenario, I’ll ignore the prior art issue for now). How would we design such a system?

Well, we still have the same considerations. We don’t want a single point of failures, and we want to have very large number of machines and make the most of their resources.

In this case, we are merely going to change the way we look at the data. We still have the following topology:


There is the consensus cluster, which is responsible for cluster wide immediately consistent operations. And there are all the other nodes, which actually handle processing requests and keeping the data.

What kind of decisions do we get to make in the consensus cluster? Those would be:

  • Adding & removing nodes from the entire cluster.
  • Changing the distribution of the data in the cluster.

In other words, the state that the consensus cluster is responsible for is the entire cluster topology. When a request comes in, the cluster topology is used to decide into which set of nodes to direct it to.

Typically in such systems, we want to keep the data on three separate nodes, so we get a request, then route it to one of those three nodes that match this. This is done by sharding the data according the the actual user id whose page views we are trying to track.

Distributing the sharding configuration is done as described in the compute cluster example, and the actual handling of requests, or sending the data between the sharded instances is handled by the cluster nodes directly.

Note that in this scenario, you cannot ensure any kind of safety. Two requests for the same user might hit different nodes, and do separate operations without being able to consider the concurrent operation. Usually, that is a good thing, but that isn’t always the case. But that is an issue of the next post.


Published at

Originally posted at

Comments (2)

Large scale distributed consensus approaches: Computing with a hundred node cluster

I’m using 100/99 node cluster as the example, but the discussion also apply for smaller clusters (dozens of nodes) and bigger clusters (hundreds or thousands). Pretty much the only reason that you want to go with clusters of that size is that you want to scale out your processing in some manner. I’ve already discussed why a hundred node cluster isn’t a good option for safety reasons.

Consensus algorithm create a single consensus in the entire cluster, usually about an order set of operations that are fed to a state machine. The easiest such example would be a dictionary. But it make no sense to have a single dictionary spread across hundred nodes. Why would you need to do that?  How would it give you the ability to make full use of all of the power of all those nodes?

Usually nodes are used for either computing or storage purposes. Computing is much easier, so let us take that as a good example. A route calculating system, need to do a lot of computations on a relatively small amount of information (the map data). Whenever there is a change in the map (route blocked, new road open, etc), it needs to send the information to all the servers, and make sure that it isn’t lost.

Since calculating routes is expensive (we’ll ignore the options for optimizations and caching for now), we want to scale it to many nodes. And since the source data is relatively small, each node can have a full copy of the data. Under this scenario, the actual problem we have to solve is how to ensure that once we save something to the cluster, it is propagated to the entire cluster.

The obvious way to do this is with a hierarchy:


Basically, the big icons are the top cluster, each of which is responsible for updating a set of secondary servers, which is then responsible for updating the tertiary servers.

To be perfectly honest, this looks nice, and even reasonable, but it is going to cause a lot of issues. Sure, the top cluster is resilient to failures, but relying on a node to be up to notify other nodes isn’t so smart. If one of the nodes in the top cluster goes down, then we have about 20% of our cluster that didn’t get the notice, which kind of sucks.

A better approach would be to go with a management system and a gossip background:


In other words, the actual decisions are down by the big guys (literally, in this picture). This is a standard consensus cluster (Paxos, Raft, etc). Once a decision has been made by the cluster, we need to send it to the rest of the nodes in the system. We can do that either by just sending the messages to all the nodes, or by selecting a few nodes and have them send the messages to their peers. The protocol for that is something like: “What is the like command id you have? Here is what I have after that.” Assuming that each processing node is connected to a few other servers, that means that we can send the information very quickly to the entire cluster. And even if there are errors, the gossiping server will correct it (note that there is an absolute order of the commands, ensured by the consensus cluster, so there isn’t an issue about agreeing to this, just distributing the data).

Usually the gossip topology follows the actual physical distribution. So the consensus cluster will notify a couple of servers on each rack, and let the servers in the rack gossip among themselves about the new value.

This means that once we send a command to the cluster, the consensus would agree on that, then we would distribute it to the rest of the nodes. There is a gap between the consensus confirming it and the actual distributing to all the nodes, but that is expected in any distributed system. If it is important to sync this on a synchronized basis across the entire cluster, the command is usually time activated (which require clock sync, but that is something that we can blame on the ops team, so we don’t care Smile).

With this system, we can have an eventually consistent set of data across the entire cluster, and we are happy.

Of course, this is something that is only relevant for compute clusters, the kind of things were you compute a result, return it to the client and that is about it. There are other types of clusters, but I’ll talk about them in my next post.


Published at

Originally posted at

Comments (6)

Large scale distributed consensus approaches: Calculating a way out

The question cross my desk, and it was interesting enough that I felt it deserves a post. The underlying scenario is this. We have distributed consensus protocols that are built to make sure that we can properly arrive at a decision and have the entire cluster follow it, regardless of failure. Those are things like Paxos or Raft. The problem is that those protocols are all aimed at relatively small number of nodes. Typically 3 – 5. What happens if we need to manage a large number of machines?

Let us assume that we have a cluster of 99 machines. What would happen under this scenario? Well, all consensus algorithm works on top of the notion of a quorum. That at least (N/2+1) machines have the same data. For a 3 nodes cluster, that means that any decision that is on 2 machines is committed, and for a 5 nodes cluster, it means that any decision that is on 3 machines is committed. What about 99 nodes? Well, a decision would have to be on 50 machines to be committed.

That means making 196 requests (98 x 2) (once for the command, then for the confirmation) for each command. That… is a lot of requests. And I’m not sure that I want to see what it would look like in term of perf. So just scaling things out in this manner is out.

In fact, this is also pretty strange thing to do. The notion of distributed consensus is that you will reach agreement on a state machine. The easiest way to think about it is that you reach agreement on a set of values among all nodes. But why are you sharing those values among so many nodes? It isn’t for safety, that is for sure.

Assuming that we have a cluster of 5 nodes, with each node having 99% availability (which translates to about 3.5 days of downtime per year). The availability of all nodes in the cluster is 95%, or about 18 days a year.

But we don’t need them to all be up. We just need any three of them to be up. That means that the math is going to be much nicer for us (see here for an actual discussion of the math).

In other words, here are the availability numbers if each node has a 99% availability:

Number of nodes Quorum Availability  
3 2 99.97% ~ 2.5 hours per year
5 3 99.999% (5 nines) ~ 5 minutes per year
7 5 99.9999% (6 nines) ~ 12 seconds per year
99 50 100%  

Note that all of this is based around each node having about 3.5 days of downtime per year. If we can have availability of 99.9% (or about 9 hours a year), the availability story is:

Number of nodes Quorum Availability  
3 2 99.9997% ~ 2 minutes a year
5 3 99.999999% ( 8 nines ) ~ 30 seconds per year
7 5 100%  

So in rough terms, we can say that going to 99 node cluster isn’t a good idea. It is quite costly in terms of the number of operation require to ensure a commit, and from a safety perspective, you can get the same safety level at the drastically lower cost.

But there is now another question, what would we actually want to do with a 99 node cluster*? I’ll talk about this in my next post.

A hundred node cluster only make sense if you have machines with about 80% availability. In other words, they are down for 2.5 months every year. I don’t think that this is a scenario worth discussing.


Published at

Originally posted at

Modeling exercise: The grocery store’s checkout model process approach

I posted about the grocery store checkout process exercise before. Now I want to see if I can do a short outline on how I would handle this.

The key aspect from my perspective is that we need to separate the notion of the data we have and the processing of the data. That means that we are going to have the following model:

public class ShoppingCart
   public List<ProductInShoppingCart> Products {get;set;}
   public List<Discount> Discounts { get;set; }

public class ProductInShoppingCart
   public string ProductId;
   public Discount Discount;

Note that we explicitly do not have a quantity field here. If we purchase 6 bottles of milk, that would appear three times in the cart. Why is that?

Let us assume that we have a sale for 2 bottles of milk for 20% discount or a 3 +1 bottles of milk offer. Consider the kind of code you would have to write in the offer code:

  • Find all products that have this offer and have 4 items without discount.
  • Add the discount to those products.
  • After searching for products without discount, need to search for products with a discount, but that we can apply this to and get a better option.

In this case, we start by doing:

  • Add bottle of milk
  • Add bottle of milk – 2 for 20% discount is triggered.
  • Add bottle of milk
  • Add bottle of milk – 3+1 offer is triggered, removing the previous discount.

Because this is likely going to be complex, I’m going to be writing this once. A set of offers and the kind of rules that we want. Then we will give the users the ability to define those rules.

Note that we keep the raw data (products) and the transformations (discounts) separate, so we can always reapply everything without losing any data.

Modeling exercise: The grocery store’s checkout model

I went to the super market yesterday, and I forgot to get out of work mode, so here is this posts. imageThe grocery store checkout model exercise deals with the following scenario. You have a customer that is scanning products in a self checkout lane, and you need to process the order.

In terms of external environment, you have:

  • ProductScanned ( ProductId: string ) event
  • Complete Order command
  • Products ( Product Id –> Name, Price ) dataset

So far, this is easy, however, you also need to take into account:

  • Sales (1+1, 2+1, 5% off for store brands, 10% off for store brands for loyalty card holders).
  • Purchase of items by weight (apples, bananas, etc).
  • Per customer discount for 5 items.
  • Rules such as alcohol can only be purchased after store clerk authorization.
  • Purchase limits (can only purchase up to 6 items of the same type, except for specific common products)

The nice thing about such an exercise is that it forces you to see how many things you have to juggle for such a seemingly simple scenario.

A result of this would be to see how you would handle relatively complex rules. Given the number of rules we already have, it should be obvious that there are going to be more, and that they are going to be changing on a fairly frequent basis. A better model would be to actually do this over time. So you start with just getting the first part, then you start streaming the other requirements, but what you actually see is the changes in the code over time. So each new requirement causes you to make modifications and accommodate the new behavior.

The end result might be a Git repository that allows you to see the full approach that was used and how it changed over time. Ideally, you should see a lot of churn in the beginning, but then you’ll have a lot less work to do as your architecture settles down.

On site Architecture & RavenDB consulting availabilities: Malmo & New York City

I’m going to have availability for on site consulting in Malmo, Sweden  (17 Sep) and in New York City, NY (end of Sep – beginning of Oct).

If you want me to come by and discuss what you are doing (architecture, nhibernate or ravendb), please drop me a line.

I’m especially interested in people who need to do “strange” things with data and data access. We are building a set of tailored database solutions for customers now, and we have seem customers show x750 improvement in performance when we gave them a database that was designed to fit their exact needs, instead of having to contort their application and their database to a seven dimensional loop just to try to store and read what they needed.

The fallacy of distributed transactions

This can be a very short post, just: See CAP. Unfortunately, we have a lot of people who actually have experience in using distributed transactions, and have a good reason to believe that they work. The answer is that yes, they do, as long as you don’t run into some of the interesting edge cases.

By the way, that is not a new observation, see The Two Generals.

Allow me to demonstrate. Assume that we have a distributed system with the following actors:


This is a fairly typical setup. You have a worker that pull messages from a queue and read/write to a database based on those messages. To coordinate between them, it uses a transaction coordinator such as MSDTC.

Transaction coordinators use a two phase commit (or sometimes a three phase commit protocols) to ensure that either all the data would be committed, or none of it would be.

The general logics goes like this:

  • For each participant in the transaction, send a prepare message.
    • If the participant answered “prepared”, it is guaranteeing that the transaction can be committed.
  • If any of the participants failed on prepare, abort the whole transaction.
  • If all of the participants successfully prepared, send the commit message to all of them.

The actual details are a bit more involved, obviously, but that is pretty much it.

Now, let us take a look at an interesting scenario. Worker #1 is pulling (in a distributed transaction) a message from the queue, and based on that message, it modify the database. Then it tells the transaction coordinator that it can commit the transaction. At this point, the TC is sending the prepare message to the database and the queue. Both reply that they have successfully prepared the transaction to be committed. The TC sends a commit message to the queue, completing the transaction from its point of view, however, at this point, it is unable to communicate with the database.

What happens now?

Before I answer that, let us look at another such scenario. The TC needs to commit a transaction, it sends a prepare message to the database, and receive a successful reply. However, before it manages to send a prepare message to the queue, it becomes unavailable.

Note that from the point of view of the database, the situation looks exactly the same. It got (and successfully replied) to a Prepare message, then it didn’t hear back from the transaction coordinator afterward.

Now, that is where it gets interesting. In an abstract world, we can just wait with the pending transaction until the connection with the coordinator is resumed, and we can actually get a “commit / abort” notification.

But we aren’t in abstract world. When we have such a scenario, we are actually locking records in the database (because they are in the process of being modified). What happens when another client comes to us and want to modify the same record?

For example, it is quite common for to host the business logic, queue and transaction coordinator on the same worker instance, while the database is on a separate machine. That means that in the image above, if Worker #1 isn’t available, we recover by directing all the users to the 2nd worker. However, at that point, we have a transaction that was prepared, but not committed.

When the user continue to make requests to our system, the 2nd worker, which has its own queue and transaction coordinator is going to try and access the user’s record. The same user whose record are currently locked because of the ongoing transaction.

If we just let it hang in this manner, we have essentially created a situation where the user’s data become utterly unavailable (at least for writes). In order to resolve that, transactions comes with a timeout. So after the timeout has expired, we can roll back that transaction. Of course, that leads to a very interesting situation.

Let us go back to the first scenario we explored. In this scenario, the queue got both Prepare & Commit messages, while the database got just a Prepare message. The timeout has expired, and the database has rolled back the transaction.  In other words, as far as the queue is concerned, the transaction committed, and the message is gone. As far as the database is concerned, that transaction was rolled back, and never happened.

Of course, the chance that something like that can happen in one of your systems? Probably one in a million.

Message passing, performance–take 2

In my previous post, I did some rough “benchmarks” to see how message passing options behave. I got some great comments, and I thought I’ll expand on that.

The baseline for this was a blocking queue, and we managed to process using that we managed to get:

145,271,000 msgs in 00:00:10.4597977 for 13,888,510 ops/sec

And the async BufferBlock, using which we got:

43,268,149 msgs in 00:00:10 for 4,326,815 ops/sec.

Using LMAX Disruptor we got a disappointing:

29,791,996 msgs in 00:00:10.0003334 for 2,979,100 ops/sec

However, it was pointed out that I can significantly improve this if I changed the code to be:

var disruptor = new Disruptor.Dsl.Disruptor<Holder>(() => new Holder(), new SingleThreadedClaimStrategy(256), new YieldingWaitStrategy(), TaskScheduler.Default);

After which we get a very nice:
141,501,999 msgs in 00:00:10.0000051 for 14,150,193 ops/sec
Another request I got was for testing this with a concurrent queue, which is actually what it is meant to do. The code is actually the same as the blocking queue, we just changed Bus<string> to ConcurrentQueue<string>.
Using that, we got:
170,726,000 msgs in 00:00:10.0000042 for 17,072,593 ops/sec
And yes, this is pretty much just because I could. Any of those methods is quite significantly higher than anything close to what I actually need.

Interview challenges: Decompress that

I’m always looking for additional challenges that I can ask people who interview at Hibernating Rhinos, and I run into an interesting challenge just now.

Implement a Decompress(Stream input, Stream output, byte[][] dictionary) routine for the following protocol:

Prefix byte – composed of the highest bit + 7 bits len

If the highest bit is not set, this is a uncompressed data, copy the next len from the input to the output.

If the highest bit is set, this is compressed data, the next byte will be the index in the dictionary, copy len bytes from that dictionary entry to the output.

After writing the code, the next question is going to be, what are the implication of this protocol? What is it going to be good for? What is it going to be bad for?

Your customer isn’t a single entity

An interesting issue came up in the comments for my modeling post.  Urmo is saying:

…there are no defined processes, just individual habits (even among people with same set of obligations) with loose coupling on the points where people need to interact. In these companies a software can be a boot that kicks them into more defined and organized operating mode.

This is part of discussion of software modeling and the kind of thinking you have to do when you approach a system. The problem with Urmo’s approach is that there is a set implicit assumptions, and that is that the customer is speaking with a single voice, that they actually know what they are doing and that they have the best interests. Yes, it is really hard to create software (or anything, actually) without those, but that happens more frequently than one might desire.

A few years ago I was working on a software to manage what was essentially long term temp workers. Long term could be 20 years, and frequently was a number of years. The area in question was caring for invalids,  and most of the customers for that company were the elderly. That meant that a worker might not be required on a pretty sudden basis (the end customer died, care no longer required).

Anyway, that is the back story. The actual problem we run into was that by the time the development team got into place there was already a very detailed spec, written by a pretty good analyst after many sessions at a luxury hotel conference room. In other words, the spec cost a lot of money to generate, and involved a lot of people from the company’s management.

What it did not include, however, was feedback from the actual people who had to place the workers at particular people’s homes, and eventually pay them for their work. Little things like the 1st of the month (you have 100s of workers coming in to get their hours approved and get paid) weren’t taken into account. The software was very focused on the individual process, and there were a lot of checks to validate input.

What wasn’t there were things like: “How do I efficiently handle many applicants at the same time?’'

The current process was paper form based, and they were basically going over the hours submitted, ask minimal questions, and provisionally approve it. Later on, they would do a more detailed scan of the hours, and do any fixups needed. That would be the time that they would also input the data to their old software. In other words, there was an entire messy process going on that the higher ups didn’t even realize was happening.

This include decisions such as “you need an advance, we’ll register that as 10 extra hours you worked this month, and we’ll deduct it next month” and “you weren’t supposed to go to Mrs. Xyz, you were supposed to go to Mr. Zabc! We can’t pay for all your hours there” , etc.

When we started working on the software, we happened to do a demo to some of the on site people, and they were horrified by what they saw. The new & improved software would end up causing them much more issues, and it would actually result in more paperwork that they have to manage just so they can make the software happy.

Modeling such things was tough, and at some point (with the client reluctant agreement) we essentially threw aside the hundreds of pages of well written spec, and just worked directly with the people who would end up using our software. The solution in the end was to codify many of the actual “business processes” that they were using. Those business processes made sense, and they were what kept the company working for decades. But management didn’t actually realize that they were working in this manner.

And that is leaving aside the “let us change the corporate structure through software” endeavors, which are unfortunately also pretty common.

To summarize, assuming that your client is a single entity, which speaks with one voice and actually know what they are talking about? Not going to fly for very long. In another case, I had to literally walk a VP of Sales through the process of how a sale is actually happening in his company versus what he thought was happening.

Sometimes this job is likely playing a shrink, but for corporations.

Modeling exercise: Flights & Travelers

I just got a really interesting customer inquiry, and I got their approval to share it. The basic problem is booking flights, and how to handle that.

The customer suggested something like the following:

{   //customers/12345
    "Name" : "John Doe",
    "Bookings" : [{
        "FlightId": "flights/1234",
        "BookingId": "1asifyupi",
        "Flight": "EA-4814",
        "From": "Iceland",
        "To" : "Japan", 
        "DateBooked" : "2012/1/1"

{ // flight/1234
   "PlaneId": "planes/1234"// centralized miles flown, service history
           "Seat": "F16"
           "BookedBy": "1asifyupi"

But that is probably a… suboptimal way to handle this. Let us go over the type of entities that we have here:

  • Customers / Passengers
  • Flights
  • Planes
  • Booking

The key point in here is that each of those is pretty independent. Note that for simplicity’s sake, I’m assuming that the customer is also the passenger (not true in many cases, a company may pay for your flight, so you the company in the customer and you the passenger).

The actual problem the customer is dealing with is that they have thousands of flights, tens or hundreds of thousands of seats and millions of customers competing for those seats.

Let us see if we can breaking it down to a model that can work for this scenario.  Customers deserve its own document, but I wouldn’t store the bookings directly in the customer document. There are many customers that fly a lot, and they are going to have a lot of booking there. At the same time, there are many bookings that are made for a lot of people at the same time (an entire family flying).

That leaves the Customer’s document with data about the customer (name, email, phone, passport #, etc) as well as details such as # of miles traveled, the frequent flyer status, etc.

Now, we have the notion of flights and bookings. A flight is a (from, to, time, plane), which contains the available seats number. Note that we need to explicitly allow for over booking, since that is a common practice for airlines.

There are several places were we have contention here:

  • When ordering, we want to over book up to a certain limit.
  • When seating (usually 24 – 48 hours before the flight) we want to reserve seats.

The good thing about it is that we actually have a relatively small contention on a particular flight. And the way the airline industry works, we don’t actually need a transaction between creating the booking and taking a seat on the flight.

The usual workflows goes something like this:

  • A “reservation” is made for a particular itinerary.
  • That itinerary is held for 24 – 48 hours.
  • That itinerary is sent to the customer for approval.
  • Customer approve and a booking is made, flight reservations are turned into actual booked seats.

The good thing about this is that because a flight can have up to ~600 seats in it, we don’t really have to worry about contention on a single flight. We can just use normal optimistic concurrency and avoid more complex models. That means that we can just retry on concurrency errors and see where that leads us. The breaking of the actual order into reservation and booking also helps, since we don’t have to coordinate between the actual charge and the reservation on the flight.

Overbooking is handled by setting a limit of how much we allow overbooking, and managing the number of booked seats vs. reserved seats. When we look at the customer data, we show the customer document, along with the most recent orders and the stats. When we look at a particular flight, we can get pretty much all of its state from the flight document itself.

And the plane’s stats are usually just handled via a map/reduce index on all the flights for that plane.

Now, in the real world, the situation is a bit more complex. We might give out 10 economy seats and 3 business seats to Expedia for a 2 months period, so they manage that, and we have partnership agreements with other airlines, and… but I think that this is a good foundation to start building this on.

The contracts of Lazy vs Task

There was a question about our use of Task<T> and Lazy<T> in RavenDB, and I thought that this is a subtle thing that deserve more than a short email.

The basic difference between Lazy<T> and Task<T> are the kind of contracts that they express.

  • Lazy<T> represent a potentially expensive operation that has been deferred. The promise given is that the lazy’s value will be generated the first time that it is needed.
  • Task<T> represent a potentially expensive operation that is currently executing. The promise is that the task’s value will be available on request, hopefully already there by the time you asked.

The major difference is when we are actually starting the operation. Most often, when we return a task, we return a reference to an scheduled / executing task, which will complete whatever or not the task’s result will be accessed. Conversely, a lazy’s whose value was never accessed is not something that will ever execute.

The use cases for them tend to be quite different.

Lazy<T> is used a lot of the time as a way to handle once and only once creation (basically, a safe singleton pattern), and for actually deferring the execution of work. Sometimes you can avoid having to do something, and it is easier to give a caller a lazy object and only have to pay for that additional work if they access the data.

Task<T> is primarily used to actually parallelize the work, so you’ll have it running while you are doing other stuff. Usually this is used for I/O, since that can be natively parallelized.

With RavenDB, we use Task<T> as the return type of all our async operations, allowing of to take advantage on the async nature of I/O. And we use Lazy<T> to setup a deferred call for the server. When someone actually access one of lazy’s values, we have to provide you with the answer. At this point, we can go to the server with all  the pending lazy operations, and save a lot of effort just making remote calls to the server.

In fact, in RavenDB 3.0, we have lazy support in the async API. That means that we have methods that return Lazy<Task<T>>, which means: Give me a deferred operation, that when required, will perform in an async manner. That gives me both the ability to combine requests to the server and avoid blocking up a thread while that I/O is in progress.

Corax: The problem of space

Corax is a research project that we have, to see how we can build a full text search library on top of Voron. Along the way, we take the chance to find out how Lucene does things, and what we can do better. Pretty much from the get go, Corax is likely to use more disk space than Lucene, probably significantly so. I would be happy if we could get a merely 50% increase over Lucene

The reason that this is the case is that Lucene goes to great length to save disk space. From storing all integers in variable length format, to prefix compression to implicitly referencing data in other files. For example, you can see that when you try reading term positions:

TermPositions are ordered by term (the term is implicit, from the .tis file).

Positions entries are ordered by increasing document number (the document number is implicit from the .frq file).

The downside of saving every little bit is that it is a lot more complex to read the data, requiring multiple caches and complex code path to actually get it properly. It make a lot of sense, when Lucene was created, disk space was at a premium. I won’t go as far as to say that disk space doesn’t matter, but given a trade off of using more disk space vs. using more memory / complexity, it is much easier to justify disk space usage today*.

* The caveat here is that you need to be careful, because just accessing the disk can be very slow.

One of the major things that we wanted to deal with Corax is reducing index corruption issues, and seeing if we can simplify things into a transactional system. As a side effect of that, we don’t need to have index segments, and we don’t need to do merges to free disk space. The problem is that in order to handle this, we need to make track additional information that Lucene doesn’t need to.

Let us look at the actual data we keep. Here is a very simple index:

using (var fullTextIndex = new FullTextIndex(StorageEnvironmentOptions.CreateMemoryOnly(), new DefaultAnalyzer()))
using (var indexer = fullTextIndex.CreateIndexer())
indexer.AddField("Name", "Oren Eini");

indexer.AddField("Name", "Arava Eini");


For each field, we are going to create a multi tree. And for each unique term in the field we have a list of (Index Entry Id, term frequency, boost).

  • @fld_Name
    • arava
      • { 2, 1, 1.0 }  (index id 2, freq 1, boost 1.0)
    • eini
      • { 1,1,1.0 }
      • { 2,1,1.0 }
    • oren
      • { 1,1,1 }
  • @fld_Email
    • arava@houseof.dog
      • { 2,1,1.0 }
    • ayende@ayende.com
      • { 1,1,1.0 }

This is pretty much the equivalent to the way Lucene store things. Possible space optimizations here include not storing default values (term frex or boost of 1), storing index entry ids as variable ints, etc.

The problem is that while this is actually enough for the way Lucene does things, it is not enough for the way Corax does things. Let us consider the case of deleting a document. How would you go about doing this using the information above?

Lucene does this by marking a document id as deleted, and will purge its details on the next segments merge. That works, but only because a segment merge actually read & write all of the relevant segments data. Without a segments merge, deleting a document is actually something that would require us to scan all the data in the entire database. This is not really practical. Therefor, we need to store additional data so we can delete it later on. In this case, we have the Docs tree, which has keys for (index entry id, field id and term num). This looks like this:

  • Docs
    • [1,1,1]: oren
    • [1,1,2]: eini
    • [1,2,1]: ayende@ayende.com
    • [2,1,1]: arava
    • [2,1,2]: eini
    • [2,2,1]: arava@houseof.dog

Using this information, we can now remove all traces of a document when it is deleted. However, the problem here is that we need to also keep the terms per document in the index. That really blow up the index size, obviously.

The reason for this peculiar way of storing the document fields in this manner is that we also want to reuse this information for sorting. When Lucene needs to sort data, it has to read all of the data from the fields, then recreate the values for all relevant documents. Corax can just serve the data already there.

A pretty obvious step to save space would be to track the terms separately, and use an id in the Docs tree, not the full term. That leads to an interesting problem, because we are going to need to be able to go from term –> id and id –> term, which pretty much require storing them twice, unfortunately.

Final note, Corax is a research project.

Success: From Opening a Champagne Bottle To Hiding Under the Bed with Said Bottle

During the RavenDB courses* in the past few weeks, I was talking with one of the attendees and I came up with what I think is a great analogy.

* Yes, courses, in the past 4 weeks, I’ve given the RavenDB course 3 times. That probably explains why I don’t remember which course it was.

What are your success metrics? From Opening a Champagne Bottle To Hiding Under the Bed with Said Bottle?

The first success  metric is when you have enough users (and, presumably, revenue) to cross the threshold to the Big Boys League. Let us call this the 25,000 users range.  That is the moment when you throw a party, go to the store and grab a whole case of champagne bottles and make fancy speeches. Of course, the problem with success is that you can have too much of it. A system that does just fine (maybe creeks a little ) on a 25,000 users is going to behave pretty differently when you have 100,000 users. That is the moment when you find your engineers under the bed, with a half empty bottle of champagne and muttering things about Out Of Capacity errors and refusing to come out until we fire all the users.

In just about any system, you need to define the success points. Because Twitter was very luck that it managed to grow even though it had so many problems when its user base exploded. It is far more likely that users will figure out that your service is… well, your engineers are drunk and hiding under the bed, so the service looks accordingly.

And yes, you can try to talk to people about SLA, and metrics and capacities. But I have found that an image like that tend to give you a lot more focused answers. Even if a lot of the time the answer is “I don’t know”. That is a place to start, because this make it a lot more acute than just “how many req/sec do we need to support?”.


Published at

Originally posted at

Comments (6)

The Corax Experiment: API

I posted before about design practice for how I would approach building a search engine library. I decided to bite the bullet and actually try to do this. Using Voron, that turned out to be a really simple thing to do. Of course, this doesn’t do a tenth of what Lucene does, but it actually does quite a lot. The code is available here, and I want to emphasize again, this is purely experimental / research project.

The entire thing comes to less than 500 lines of code. And it is pretty functional even at this stage.

Corax is composed of:

  • Analysis
  • Indexing
  • Querying

Analysis of the documents is handled via analyzers:

   1: public interface IAnalyzer
   2: {
   3:     ITokenSource CreateTokenSource(string field, ITokenSource existing);
   4:     bool Process(string field, ITokenSource source);
   6: }

An analyzer create a token source, which accept a TextReader and produces token. For each token, the Process method is called, and it is used to do things to the relevant token. For example, here is the default analyzer:

   1: public class DefaultAnalyzer : IAnalyzer
   2: {
   3:     readonly IFilter[] _filters =
   4:     {
   5:         new LowerCaseFilter(), 
   6:         new RemovePossesiveSuffix(), 
   7:         new StopWordsFilter(), 
   8:     };
  11:     public ITokenSource CreateTokenSource(string field, ITokenSource existing)
  12:     {
  13:         return existing ?? new StringTokenizer();
  14:     }
  16:     public bool Process(string field, ITokenSource source)
  17:     {
  18:         for (int i = 0; i < _filters.Length; i++)
  19:         {
  20:             if (_filters[i].ProcessTerm(source) == false)
  21:                 return false;
  22:         }
  23:         return true;
  24:     }
  25: }

The idea here is to match, fairly closely, what Lucene is doing, but hopefully with clearer code. This analyzer will text a stream of text, break it up to discrete tokens, lower case them, remove the possessive ‘s suffix and clear stop words. Note that each of the filters are actually modifying the token in place.  And the tokenizer is pretty simple, but it does the job for now.

Now, let us move to indexing. With Lucene, the recommendation is that you’ll reuse your document and field instance, to avoid create garbage for the GC. With Corax, I took it a step further:

   1: using (var indexer = fullTextIndex.CreateIndexer())
   2: {
   3:     indexer.NewDocument();
   4:     indexer.AddField("Name", "Oren Eini");
   6:     indexer.NewDocument();
   7:     indexer.AddField("Name", "Ayende Rahien");
   9:     indexer.Flush();
  10: }

There are a couple of things to note here. An index can create indexers, it is intended to have multiple concurrent indexers running at the same time. Note that unlike Lucene, we don’t have Document or Field classes. Instead, we call methods on the indexer to create a new document and then add fields to the current document. When you are done with a document, you start a new one, or flush to complete the entire operation. For long running indexing, the indexer will flush itself automatically for you.

I think that this API gives us the best approach. It guide you toward using a single instance, with internal optimizations to make it memory efficient. Multiple instances can be used concurrently to speed up indexing time. And it knows when to spill flush itself for you, so you don’t have to worry about that.  Although you do have to complete the operation by calling Flush() at the end.

How about searching? That turned out to be pretty similar as well. All you have to do is create a searcher:

   1: using (var searcher = fti.CreateSearcher())
   2: {
   3:     QueryResults queryResults = searcher.QueryTop(new TermQuery("Name", "Arava"), 10);
   4:     Console.WriteLine(queryResults.TotalResults);
   5:     foreach (var match in queryResults.Results)
   6:     {
   7:         Console.WriteLine(match.DocumentId + " - " + match.Score);
   8:     }
   9: }

We create a searcher, and then we can utilize it to perform queries.

So far, this has been all about the API we have, I’ll talk about the actual implementation in my next post.

Design practice: Building a search engine library

Note: This is done purely as a design practice. We don’t have any current plans to implement this, but I find that it is a good exercise in general.

How would I go about building a search engine for RavenDB to replace Lucene. Well, we have Voron as the basis for storage, so from the get go, we have several interesting changes. To start with, we inherit the transactional properties of Voron, but more importantly, we don’t have to do merges, so we don’t have any of those issues. In other words, we can actually generate stable document ids.

But I’m probably jumping ahead a bit. We’ll start with the basics. Analysis / indexing is going to be done in very much the same way. Users will provide an analyzer and a set of documents, like so:

   1: var index = new Corax.Index(storageOptions, new StandardAnalyzer());
   3: var scope = index.CreateIndexingScope();
   5: long docId = scope.Add(new Document
   6: {
   7:     {"Name", "Oren Eini", Analysis.Analyzer},
   8:     {"Name", "Ayende Rahien", Analysis.Analyzer},
   9:     {"Email", "ayende@ayende.com", Analyzed.AsIs, Stored.Yes}
  10: });
  12: docId = scope.Add(new Document
  13: {
  14:     {"Name", "Arava Eini", Analysis.Analyzer},
  15:     {"Email", "arava@doghouse.com", Analyzed.AsIs, Stored.Yes}
  16: });
  18: index.Sumbit(index);

Some notes about this API. It is modeled quite closely after the Lucene API, and it would probably need additional work. The idea here is that you are going to need to get an indexing scope, which is single threaded. And you can have multiple indexing scopes running a the same time. You can batch multiple writes into a single scope, and it behaves like a transaction.

The idea is to deal with all of the work associated with indexing the document into a single threaded work, so that make it easier for us. Note that we immediately get the generated document id, but that the document will only be available for searching when you have submitted the scope.

Under the hood, this does all of the work at the time of calling Add(). The analyzer will run on the relevant fields, and we will create the appropriate entries. How does that work?

Every document has a long id associated with it. In Voron, we are going to have a ‘Documents’ tree, with the long id as the key, and the value is going to be all the data about the document we need to keep. For example, it would have the stored fields for that documents, or the term positions, if required, etc. We’ll also have a a Fields tree, which will have a mapping of all the field names to a integer value.

Of more interest is how we are going to deal with the terms and the fields. For each field, we are going to have a tree called “@Name”, “@Email”, etc. Those are multi trees, with the keys in that tree being the terms, and the multi values being the document ids that has those threes. In other words, the code above is going to generate the following data in Voron.

  • Documents tree:
    • 0 – { “Fields”: { 1: “ayende@ayende.com” } }
    • 1 – { “Fields”: { 1: “arava@doghouse.com” } }
  • Fields tree
    • 0 – Name
    • 1 – Email
  • @Name tree
    • ayende – [ 0 ]
    • arava – [ 1 ]
    • oren – [ 0 ]
    • eini – [ 0, 1 ]
    • rahien – [ 0 ]
  • @Email tree
    • ayende@ayende.com – [ 0 ]
    • arava@doghouse.com – [ 1 ]

Given this model, we can now turn to the actual querying part of the business. Let us assume that we have the following query:

   1: var queryResults = index.Query("Name: Oren OR Email: ayende@ayende.com");
   2: foreach(var result in queryResult.Matches)
   3: {
   4:     Console.WriteLine("{0}: {1}", result.Id, result["Email"] )
   5: }

How does querying works? The query above would actually be something like:

   1: new BooleanQuery(
   2:     Match.Or,
   3:     new TermQuery("Name", "Oren"),
   4:     new TermQuery("Email", "ayende@ayende.com")
   5:     )

The actual implementation of this query would just access the relevant field tree and load the values in the particular key. The result is a set of ids for both parts of the query, and since we are using an OR here, we will just union them and return the list of results back.

Optimizations here can be to run those queries in parallel, or to just cache the results of particular term query, so we can speed things even more.  This looks simple, and it is, because a lot of the work has already been done in Voron. Searching is pretty much complete, we only need to tell it what to search with.

Raft, as the Raven flies

The interesting thing about the etcd / go-raft review wasn’t so much what they did, they did things quite beautifully, but the things that I wanted them to do that they didn’t do. The actual implementation was fascinating, and for the purposes of what is required, more than adequate. I, however, have a very specific goal in mind, and that means that this is really not going to work with the given implementations.

I’ve better explain first. I’m not sure if go-raft was written by the same people as etcd, but they do “smell” very much the same. It just occurred to me that yes, they are actually both on github, so I checked it appears that I was correct. xiangli-cmu, philips, benbjohnson and bcwaldon (among many others) have worked on both projects. The reason this is important is that this reinforce my feeling that go-raft was written mainly to serve the need of etcd. Anyway…

The reason that I think that is that the implementation is very much optimized at the level etcd needs it. What I mean is that there is a lot of work there to support multiple concurrent operations that would be batched to all the nodes in the cluster, then be applied in the in memory model. This work because etcd is a set of key/value pairs that are shared among many nodes, but they are usually very small values, and very small number of keys. I would be surprised if there were hundreds of thousands of keys in a single etcd installation. That just isn’t the type of thing that it is meant to do. And that reflects throughout.

For example, there is an inherit assumption through the codebase, that the data set is small, the each value is small, etc. When sending a snapshot over the wire, there is the assumption that it can all fit in memory and that this is a good thing to do so. Even more to the point, the system where we only push entries to the nodes on heartbeats is great when you are trying to batch multiple changes together, with each change being independent of all the others (usually). However, that really doesn’t work when what you actually need is something like the sort of things that we usually deal with.

I want to emphasize that I’m really evaluating the way etcd & go-raft are built and find them wanting for my own purposes, I think that they are doing quite great for the kind of problem that they are meant to solve.

Let us consider the kind of databases that we have. We usually have to deal with much larger values, typical documents are in the KB range, and are frequently hundreds of KB in size. We also tend to have quite a lot of them. Trying to put them all in memory (the etcd way) would be… suboptimal. Now, one option would be to try and do just that, and have each operation in RavenDB (put/delete documents) as a state machine command in Raft. The problem is that this really gets complicated very quickly, leaving aside the fact that it isn’t a general solution, and we really want to try to get to a general solution.

We don’t want to solve this once for RavenDB, and then for RavenFS, and then for… etc. And the reason that working at that level is tricky is that you need to push everything through the state machine, and everything in this case really does mean everything. That lead to a very different database design and implementation. It is also probably not really necessary. We can drop this a level or two down and instead of dealing at the database level, we can deal with that at the storage layer level. This is basically just an extension of the log shipping idea. Instead of using Raft for high level commands, just use Raft to create a consensus around the sequence of writes to the journal. That means that all the nodes will have the same journal, and thus have the same data in the storage.

That in turn means that an “entry” can actually be pretty large (mutli MB range, sometimes). And because the journal is purely sequential, we need to issue that command to all the nodes as soon as it is submitted to the Raft state machine.

Now, the reason that the go-raft implementation waits for the heartbeat is that it batches things up nicely, and really speed up the general case. But we don’t need to do that for what we are going to do. Or, to be rather more exact, we have already done just that. That is basically what Voron’s transaction merging is all about. And since we can only handle writes as the leader, and since we will merge concurrent writes anyway, that is going to end up as a very similar behavior under load, and faster without load. At least that is what the initial thinking is saying.

Snapshots, also something quite important for Raft, can be handle very simply by just doing a backup/restore over the network, by streaming all the data.

And the rest is just implementing Raft Smile.

Published at

Originally posted at

Reviewing go-raft, part II

In my previous post, I started to go over the go-raft implementation, after being interrupted by the need to sleep, I decided to go on with this, but I wanted to expand a bit first about the issue we discussed earlier, not checking the number of bytes read in log_entry’s Decode.


Let us assume that we actually hit that issue, what would happen?


The process goes like this, we try to read a value, but the Read method only return some of the information. We explicitly ignore that, and try to use the buffer anyway. Best case scenario, we are actually getting an error, so we bail early. At that point, we detect the error and truncate the file. Hello data loss, nice to see you. For fun, this is the best case scenario. It is worse if we marshal the partial data without an error. Then we have not the case of “oh, we have a node that is somehow way behind”, we have the case of “this node actually applied different commands than anyone else”.

I reported this issue, and I’m interested to know if my review is in any way correct. With that said, let us move on…

getEntriesAfter gives us all the in memory entries. That is quite similar to how RavenDB handled indexing, for that matter, so it is amusing. But this applies only to in memory stuff, and it is quite interesting to see how this will interact with other parts of the codebase.

setCommitIndex is interesting. In my head, committing something means flushing them to disk. But in Raft’s term. Committing something means applying the commands. But the reason it is interesting is that it has some interesting comments on edge cases. So far, I haven’t see actually writing to disk, mind.

And this one gives me a headache:


Basically, this mean that we need to write the commit index to the beginning of the file. It is also an extremely unsafe operation. What happens if you crash immediately after? Did you change go through, or not? For that matter, there is nothing that prevents the OS from first writing the changes you made to the beginning of the file then whatever else you wrote at the end. So a crash might actually leave you with the commit pointer pointing at corrupted data. Luckily, I don’t see anything there actually calling this, though.

The truncate method makes my head ache, mostly because it does things like delete data, which makes my itchy. This is called from the server code as part of normal processing of the append entries request. What this does, in effect, is to say something like: I want you to apply this log entry, make sure that your previous log entry is this, and if it isn’t, revert it back to this entry.  This is how Raft ensure that all the logs are the same across the cluster.

Then we have this:

   1:  // Appends a series of entries to the log.
   2:  func (l *Log) appendEntries(entries []*protobuf.LogEntry) error {
   3:      l.mutex.Lock()
   4:      defer l.mutex.Unlock()
   6:      startPosition, _ := l.file.Seek(0, os.SEEK_CUR)
   8:      w := bufio.NewWriter(l.file)
  10:      var size int64
  11:      var err error
  12:      // Append each entry but exit if we hit an error.
  13:      for i := range entries {
  14:          logEntry := &LogEntry{
  15:              log:      l,
  16:              Position: startPosition,
  17:              pb:       entries[i],
  18:          }
  20:          if size, err = l.writeEntry(logEntry, w); err != nil {
  21:              return err
  22:          }
  24:          startPosition += size
  25:      }
  26:      w.Flush()
  27:      err = l.sync()
  29:      if err != nil {
  30:          panic(err)
  31:      }
  33:      return nil
  34:  }

This seems pretty easy to follow, all told. But note the call to sync() there in line 27. And the fact that this translate down to an fsync, which is horrible for performance.

There is also appendEntry, which appears to be doing the exact same thing as appendEntries and writeEntry. I’m guessing that the difference is that appendEntries is called for a follower, and appendEntry is for a leader.

The last thing to go through in the log.go file is the compact function, which is… interesting:

   1:  // compact the log before index (including index)
   2:  func (l *Log) compact(index uint64, term uint64) error {
   3:      var entries []*LogEntry
   5:      l.mutex.Lock()
   6:      defer l.mutex.Unlock()
   8:      if index == 0 {
   9:          return nil
  10:      }
  11:      // nothing to compaction
  12:      // the index may be greater than the current index if
  13:      // we just recovery from on snapshot
  14:      if index >= l.internalCurrentIndex() {
  15:          entries = make([]*LogEntry, 0)
  16:      } else {
  17:          // get all log entries after index
  18:          entries = l.entries[index-l.startIndex:]
  19:      }
  21:      // create a new log file and add all the entries
  22:      new_file_path := l.path + ".new"
  23:      file, err := os.OpenFile(new_file_path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
  24:      if err != nil {
  25:          return err
  26:      }
  27:      for _, entry := range entries {
  28:          position, _ := l.file.Seek(0, os.SEEK_CUR)
  29:          entry.Position = position
  31:          if _, err = entry.Encode(file); err != nil {
  32:              file.Close()
  33:              os.Remove(new_file_path)
  34:              return err
  35:          }
  36:      }
  37:      file.Sync()
  39:      old_file := l.file
  41:      // rename the new log file
  42:      err = os.Rename(new_file_path, l.path)
  43:      if err != nil {
  44:          file.Close()
  45:          os.Remove(new_file_path)
  46:          return err
  47:      }
  48:      l.file = file
  50:      // close the old log file
  51:      old_file.Close()
  53:      // compaction the in memory log
  54:      l.entries = entries
  55:      l.startIndex = index
  56:      l.startTerm = term
  57:      return nil
  58:  }

This code can’t actually run on Windows. Which is interesting. The issue here is that it is trying to rename a file that is open on top of another file which is open. Windows does not allow it.

But the interesting thing here is what this does. We have the log file, which is the persisted state of the in memory entries collection. Every now and then, we compact it by creating a snapshot, and then we create a new file, with only the entries after the newly created snapshot position.

So far, so good, and that gives me a pretty good feeling regarding how the whole thing is structured. Next in line, the peer.go file. This represent a node’s idea about what is going on in the another node in the cluster. I find the heartbeat code really interesting:

// Starts the peer heartbeat.
func (p *Peer) startHeartbeat() {
    p.stopChan = make(chan bool)
    c := make(chan bool)
    go p.heartbeat(c)

// Stops the peer heartbeat.
func (p *Peer) stopHeartbeat(flush bool) {
    p.stopChan <- flush

// Listens to the heartbeat timeout and flushes an AppendEntries RPC.
func (p *Peer) heartbeat(c chan bool) {
    stopChan := p.stopChan

    c <- true

    ticker := time.Tick(p.heartbeatInterval)

    debugln("peer.heartbeat: ", p.Name, p.heartbeatInterval)

    for {
        select {
        case flush := <-stopChan:
            if flush {
                // before we can safely remove a node
                // we must flush the remove command to the node first
                debugln("peer.heartbeat.stop.with.flush: ", p.Name)
            } else {
                debugln("peer.heartbeat.stop: ", p.Name)

        case <-ticker:
            start := time.Now()
            duration := time.Now().Sub(start)
            p.server.DispatchEvent(newEvent(HeartbeatEventType, duration, nil))

Start heartbeat starts a new heartbeat, and then wait under the heartbeat function notify it that it has started running.

What is confusing is the reference to the peer’s server. Peer is defined as:

// A peer is a reference to another server involved in the consensus protocol.
type Peer struct {
    server            *server
    Name              string `json:"name"`
    ConnectionString  string `json:"connectionString"`
    prevLogIndex      uint64
    mutex             sync.RWMutex
    stopChan          chan bool
    heartbeatInterval time.Duration

And it seems logical to think that this is a remote peer’s server, but this is actually the local server reference, not the remote one. Note that it is actually the flush method that does the remote call.

Flush is defined as:

func (p *Peer) flush() {
    debugln("peer.heartbeat.flush: ", p.Name)
    prevLogIndex := p.getPrevLogIndex()
    term := p.server.currentTerm

    entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest)

    if entries != nil {
        p.sendAppendEntriesRequest(newAppendEntriesRequest(term, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries))
    } else {
        p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.snapshot))

The interesting thing here is that the entries collection might be empty (in which case this serve as just a heartbeat). Another thing that pops to mind is that this has an explicitly leader instructing follower to generate snapshots. The Raft paper suggested that this is something that would happen locally on each server on an independent basis.

There is a lot of interesting behavior in sendAppendEntriesRequest(), not so much in what it does, as in how it handles replies. There is a lot of state going on there. It’s very well commented, so I’ll let you read it, there isn’t anything that is actually going on that is complex.

What is fascinating is that while the transport layer for go-raft is HTTP, which is inherently request/response. It actually handles this in an interesting fashion:

  • Requests are synchronous
  • On reply, the in memory state of the peer is updated immediately
  • The response from the peer is queued to be handled by the server event loop

The end result is that a lot of the handling is centralized into a really pretty state machine. The rest of what is going on there is not very interesting, except for snapshots, but those are covered elsewhere.

And now, we are ready to actually go and look at the server code, but… not yet. It is over thousand lines of code, so I think that I’ll go over other stuff first. In particular, snapshotting looks interesting.


This is actually quite depressing. Note the State properties here. There is an implicit assumption that it is possible / advisable to go with the entire in memory state like that. I know that I am sensitive to such things, but that seems like an aweful lot of waste when talking about large systems.

Here is one such issue:


Let us assume that our state is big, hundreds of MB or maybe a few GB in size.

We currently hold it in memory inside the Snaphsot.STate, then we marshal that to json. Now, I actually had to go and check, but Go’s json package actually does the usual thing and encode a byte array as a base 64 formatted string. What that means, in turn, is that you have an overhead of about 25% that you have to deal with, and this is all allocated in main memory. And then you write it to a file.

This is…. quite insane, to be frank.

Assuming that I have a state that is 100 MB in size, I’m going to hold all of that in memory, then allocate another 125MB just to hold the json state, then write it to a file. Why not write it to a file directly in the first place? (You could do CRC along the way).

The whole thing appear to be assuming small sizes of data. Throughout the entire codebase, actually.

And now, I have no other ways to avoid it, we are going into the server.go itself…

There is a lot of boilerplate stuff there, but the first interesting thing happens when we look at how to apply the log:


This says, when we need to apply it, execute the command method on my state. A lot of the other methods are some variant of:


Nothing to see here at all.

And we finally get to the key part, the event loop:


Let us look in detail on the followerLoop. Inside that function, we have a loop that waits for:

  1. Stop signal, which would lead to us shutting down…
  2. We got an event on our queue…
  3. The timeout for an event has expired…

There is one part there that puzzles me:


promotable will return true if the log has any entries at all. I’m not really sure why that is the case, to be honest. In particular, what about the case when we start with an empty server. I’m going to go on reading the code, and we’ll see where it leads us. And it leads us to:


So next we need to figure out what is this self join stuff. I am not sure if that is something comes from Raft or from an external source. I found this issue that discusses this, but it isn’t very helpful in terms of understanding who issue the self join command. I tried looking at the etcd codebase, but I didn’t find anything so far. I’ll leave it for now.

The rest of the operations are basically just forwarding the calls to the appropriate methods if they are in the allowed state.

The caniddateLoop method isn’t anything special, it follows the Raft paper pretty nicely, although I have to admit the “candidate becomes follower upon Append Entries command” is buried deep. The same is true for the other behaviors. The appropriate state based responses sometimes are hard to figure out, because you have the state loop, then you have the same apparent behavior everywhere. For example, we need to become follower if we get an Append Entries request. That happens in processAppendEntriesRequest(), but it would actually be easier to see this if we had code duplication. This is a case where getting familiar with the codebase would help understanding it, and I don’t think that this would be a change worth doing, anyway.

Probably the most interesting behavior is in the leadershipLoop when we process a command. A command is added to the server queue using a Do(Command) method. It is then processed in processCommand.

The problem here is that commands are actually appended to the log, and then sent to peers using the heartbeat interval. By default, that stand at 50 ms.

This is great and all, but it does mean that the latency for requests is going to suffer. This doesn’t matter that much for something like etcd. The assumption here is that the requests are all going to be on different things, so we can queue a lot of commands and get pretty good speed overall. It is a problem if in our system, we have to process sequential operations. In that mode, we can’t wait until the heartbeat, and we want to process this right away. I’ll discuss this later,I think. It is a very important property of this implementation (but not for Raft in general).

Looking at the snapshot state, this happens when a follower get this a SnapshotRequest, but I don’t see anywhere that send it. Maybe it is another caller originated thing?

I just looked at the etcd source, and I think that I confirmed that both behaviors are there in the etcd source. So I think that that explains it.

And this is it, basically. I have some thoughts about the implementation of this and of etcd, but I think that this is enough for now… I’ll post them in my next post.

Reviewing go-raft, part I

After going over the etcd codebase, I decided that the raft portion of this is deserving a much stronger look. The project is here, and I am reviewing commit: 30f261bfe873561c2c75b6206ba1f62a42dbc8d6

Again, I strong recommend reading the Raft paper. It is quite good. At any rate, assuming that you understand Raft, let us get cracking. This time, I’m reading this in Sublime Text. As usual, I’m reading in lexicographical order, and I’m starting from append_entires.go

AppendEntries is at the very heart of Raft, so I was pleased to see it here:

// The request sent to a server to append entries to the log.
type AppendEntriesRequest struct {
    Term         uint64
    PrevLogIndex uint64
    PrevLogTerm  uint64
    CommitIndex  uint64
    LeaderName   string
    Entries      []*protobuf.LogEntry

// The response returned from a server appending entries to the log.
type AppendEntriesResponse struct {
    pb     *protobuf.AppendEntriesResponse
    peer   string
    append bool

However, I didn’t really understand this code. It seemed circular, at least until I realized that we also have a whole lot of generated files. See:


The actual protobuf semantics are (excluding a lot of stuff, of course):

message LogEntry {
    required uint64 Index=1;
    required uint64 Term=2;
    required string CommandName=3;
    optional bytes Command=4; // for nop-command

message AppendEntriesRequest {
    required uint64 Term=1;
    required uint64 PrevLogIndex=2;
    required uint64 PrevLogTerm=3;
    required uint64 CommitIndex=4;
    required string LeaderName=5;
    repeated LogEntry Entries=6;

message AppendEntriesResponse {
    required uint64 Term=1;
    required uint64 Index=2;
    required uint64 CommitIndex=3;
    required bool   Success=4;

So, goraft (which I always read as graft) is using protocol buffers as its wire format. Note in particular that the LogEntry contain the full content of a command. That AppendEntriesRequest has an array of them, and that the AppendEntriesResponse is setup separately. That means that it is very natural to use a one way channel for communication. Even though we do request response, there is a high degree of separation between the request & reply. Indeed, from reading the code in etcd, I thought that was the case.

There is something that really bothers me, though. I noticed that in etcd’s codebase as well. This is things like this:

// Encodes the AppendEntriesRequest to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (req *AppendEntriesRequest) Encode(w io.Writer) (int, error) {
    pb := &protobuf.AppendEntriesRequest{
        Term:         proto.Uint64(req.Term),
        PrevLogIndex: proto.Uint64(req.PrevLogIndex),
        PrevLogTerm:  proto.Uint64(req.PrevLogTerm),
        CommitIndex:  proto.Uint64(req.CommitIndex),
        LeaderName:   proto.String(req.LeaderName),
        Entries:      req.Entries,

    p, err := proto.Marshal(pb)
    if err != nil {
        return -1, err

    return w.Write(p)

I’m not sure about the actual semantics of memory allocations in Go, but let us assume that we had a single ,log entry with 1KB for the command data. This means that we would have the command data:

  • Once in the LogEntry inside the AppendEntriesRequest
  • Once in the protocol buffers byte array returned from Marshal

There doesn’t appear to be any way to directly stream things. Maybe it is usually dealing with small amounts of data, maybe they didn’t notice, or maybe something in Go make this very efficient, but I doubt it.

The next interesting part is Command handling. Raft is all about reaching a consensus on the order of executing a set of commands in a cluster. So it is really interesting to see it being handled with Go’s interfaces.

// Command represents an action to be taken on the replicated state machine.
type Command interface {
    CommandName() string

// CommandApply represents the interface to apply a command to the server.
type CommandApply interface {
    Apply(Context) (interface{}, error)

type CommandEncoder interface {
    Encode(w io.Writer) error
    Decode(r io.Reader) error

We have some additional things about serializing commands and reading them back, but nothing beyond this. The Commands.go file, however, is of a little bit more interest. Let us look at the join command:

// Join command interface
type JoinCommand interface {
    NodeName() string

// Join command
type DefaultJoinCommand struct {
    Name             string `json:"name"`
    ConnectionString string `json:"connectionString"`

// The name of the Join command in the log
func (c *DefaultJoinCommand) CommandName() string {
    return "raft:join"

func (c *DefaultJoinCommand) Apply(server Server) (interface{}, error) {
    err := server.AddPeer(c.Name, c.ConnectionString)

    return []byte("join"), err

func (c *DefaultJoinCommand) NodeName() string {
    return c.Name

I’m not sure when we have an interface for JoinCommand, then a default implementation like that. I saw that elsewhere in etcd, it might be a Go pattern. Note that the JoinCommand is an interface that embeds another interface (Command, in this case, obviously).

Note that you have the Apply function to actually handle the real work, in this case, add a peer.  There is nothing interesting in config.go, debug.go or context.go but event.go is puzzling. To be fair, I am really at a loss to explain this style:

// Event represents an action that occurred within the Raft library.
// Listeners can subscribe to event types by using the Server.AddEventListener() function.
type Event interface {
    Type() string
    Source() interface{}
    Value() interface{}
    PrevValue() interface{}

// event is the concrete implementation of the Event interface.
type event struct {
    typ       string
    source    interface{}
    value     interface{}
    prevValue interface{}

// newEvent creates a new event.
func newEvent(typ string, value interface{}, prevValue interface{}) *event {
    return &event{
        typ:       typ,
        value:     value,
        prevValue: prevValue,

// Type returns the type of event that occurred.
func (e *event) Type() string {
    return e.typ

// Source returns the object that dispatched the event.
func (e *event) Source() interface{} {
    return e.source

// Value returns the current value associated with the event, if applicable.
func (e *event) Value() interface{} {
    return e.value

// PrevValue returns the previous value associated with the event, if applicable.
func (e *event) PrevValue() interface{} {
    return e.prevValue

Why go to all this trouble to define things this way? It seems like a lot of boiler plate code. It would be easier to just expose a struct directly. I am assuming that this is done so you can send other things than the event struct, with additional information as well. In C# you’ll do that by subsclassing the event, but you cannot do that in Go. A better alternative might have been to just have a tag / state field in the struct and let it go that way, though.

event_dispatcher.go is just an implementation of a dictionary of string to events, nothing much beyond that. A lot of boiler plate code, too.

http_transporter.go is next, and is a blow to my hope that this will do a one way messaging system. I’m thinking about doing Raft over ZeroMQ or NanoMSG. Here is the actual process of sending data over the wire:

// Sends an AppendEntries RPC to a peer.
func (t *HTTPTransporter) SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
    var b bytes.Buffer
    if _, err := req.Encode(&b); err != nil {
        traceln("transporter.ae.encoding.error:", err)
        return nil

    url := joinPath(peer.ConnectionString, t.AppendEntriesPath())
    traceln(server.Name(), "POST", url)

    t.Transport.ResponseHeaderTimeout = server.ElectionTimeout()
    httpResp, err := t.httpClient.Post(url, "application/protobuf", &b)
    if httpResp == nil || err != nil {
        traceln("transporter.ae.response.error:", err)
        return nil
    defer httpResp.Body.Close()

    resp := &AppendEntriesResponse{}
    if _, err = resp.Decode(httpResp.Body); err != nil && err != io.EOF {
        traceln("transporter.ae.decoding.error:", err)
        return nil

    return resp

This is very familiar territory for me, I have to say Smile. Although, again, there is a lot of wasted memory here by encoding the data multiple times, instead of streaming it directly.

And here is how it receives information:

// Handles incoming AppendEntries requests.
func (t *HTTPTransporter) appendEntriesHandler(server Server) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        traceln(server.Name(), "RECV /appendEntries")

        req := &AppendEntriesRequest{}
        if _, err := req.Decode(r.Body); err != nil {
            http.Error(w, "", http.StatusBadRequest)

        resp := server.AppendEntries(req)
        if _, err := resp.Encode(w); err != nil {
            http.Error(w, "", http.StatusInternalServerError)

Really there is nothing much to write home about, to be frank. All of the operations are like that, just encoding/decoding and forwarding the code to the right function. I’m skipping log.go in favor of going to log_entry.go for a moment. The log is really important in Raft, so I want to focus on small chewables first.

If the user don’t provide an encoder for a command, it will be converted using json, then serialized to a writer using protocol buffers format.

One thing that I did notice that was interesting was a bug in decoding from a ptorocol buffer stream:

// Decodes the log entry from a buffer. Returns the number of bytes read and
// any error that occurs.
func (e *LogEntry) Decode(r io.Reader) (int, error) {

    var length int
    _, err := fmt.Fscanf(r, "%8x\n", &length)
    if err != nil {
        return -1, err

    data := make([]byte, length)
    _, err = r.Read(data)

    if err != nil {
        return -1, err

    if err = proto.Unmarshal(data, e.pb); err != nil {
        return -1, err

    return length + 8 + 1, nil

Do you see the bug?

It is in the reading of the data from the reader. A reader may decide to read less than the data that was requested. In this case, I’m assuming that it is always sending fully materialized readers to the Decode method, not surprising given how often it will create in memory buffers for the entire dataset. Still.. that isn’t nice to do, and it can create the most subtle and hard to understand bugs.

And now, into the Log!

// A log is a collection of log entries that are persisted to durable storage.
type Log struct {
    ApplyFunc   func(*LogEntry, Command) (interface{}, error)
    file        *os.File
    path        string
    entries     []*LogEntry
    commitIndex uint64
    mutex       sync.RWMutex
    startIndex  uint64 // the index before the first entry in the Log entries
    startTerm   uint64

// The results of the applying a log entry.
type logResult struct {
    returnValue interface{}
    err         error

There are a few things that we can notice right now. First, ApplyFunc is how we control the application of stuff to the in memory state, I am assuming. Given that applying the log can only happen after we have a consensus and probably fsynced to disk, it makes sense to invoke it from here.

Then, we also have a file, so that is where we are actually doing a lot of the interesting stuff, like actual storage IO and things like that. The in memory events array is also interesting, mostly because I wonder just how big it is, and when it is getting truncated. I think that the way it works, we have the log properties, which likely represent the flushed to disk state, and the entries represent the yet to be flushed state.

Things get interesting in the open method, which is called to create new log or recover an existing one. The interesting parts (recovery) is here:

// Read the file and decode entries.
for {
    // Instantiate log entry and decode into it.
    entry, _ := newLogEntry(l, nil, 0, 0, nil)
    entry.Position, _ = l.file.Seek(0, os.SEEK_CUR)

    n, err := entry.Decode(l.file)
    if err != nil {
        if err == io.EOF {
            debugln("open.log.append: finish ")
        } else {
            if err = os.Truncate(path, readBytes); err != nil {
                return fmt.Errorf("raft.Log: Unable to recover: %v", err)
    if entry.Index() > l.startIndex {
        // Append entry.
        l.entries = append(l.entries, entry)
        if entry.Index() <= l.commitIndex {
            command, err := newCommand(entry.CommandName(), entry.Command())
            if err != nil {
            l.ApplyFunc(entry, command)
        debugln("open.log.append log index ", entry.Index())

    readBytes += int64(n)

This is really interesting, because it is actually sending the raw file to the Decode function, unlike what I expected. The reason this is surprising is that there is a strong likelihood that the OS  will actually return less data than requested. As it turns out, on Windows, this will never be the case, but it does appear (at least from the contract of the API it ends up calling) that at least on Linux, that is possible. Now, I ended up going all the way to the sys call interface in linux, so I’m pretty sure that this can’t happen there either, but still…

At any rate, the code appear to be pretty clear. We read the log entry, decode it, (truncating the file if there are any issues) and if need be, we apply it.

And I think that this is enough for now… it is close to 9 PM, and I need to do other things as well. I’ll get back to this in my next post.

Reviewing etcd

The etcd project is a project that I stumbled upon that looks interesting. It is a a highly-available key value store for shared configuration and service discovery. It is written in Go and is implemented using Raft. I’m reviewing commit  46d817f91b2edf4141081abff7d92a4f71d39248.

I don’t know Go, and I think that this would be a great way to learn both about Raft (which I am very interested about) and about Go (which I peeked at occasionally, but never really studied). Like some of my other posts, this is likely to be a very long and rambling one. For reading the code, I am using LiteIDE, at least for now.

This is what this looks like.


I usually like to do a lexicographical read through the codebase, at least at first. That means that in this case, I have to go through the docs first. Probably not a totally bad idea, but a divergence from my usual approach.

Discovery – it looks like etcd handled the problem of initial peers selection by… going to another etcd cluster, that handle membership information. There is a SaaS offering, it appears (discovery.etcd.io). I like the recursive nature of that, and obviously you can set it up with a static list of peers to start with.

The first real code file I saw was this one, bench.go:

   1: package main
   3: import (
   4:     "flag"
   5:     "log"
   6:     "strconv"
   8:     "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
   9: )
  11: func write(endpoint string, requests int, end chan int) {
  12:     client := etcd.NewClient([]string{endpoint})
  14:     for i := 0; i < requests; i++ {
  15:         key := strconv.Itoa(i)
  16:         _, err := client.Set(key, key, 0)
  17:         if err != nil {
  18:             println(err.Error())
  19:         }
  20:     }
  21:     end <- 1
  22: }
  24: func watch(endpoint string, key string) {
  25:     client := etcd.NewClient([]string{endpoint})
  27:     receiver := make(chan *etcd.Response)
  28:     go client.Watch(key, 0, true, receiver, nil)
  30:     log.Printf("watching: %s", key)
  32:     received := 0
  33:     for {
  34:         <-receiver
  35:         received++
  36:     }
  37: }
  39: func main() {
  40:     endpoint := flag.String("endpoint", "", "etcd HTTP endpoint")
  42:     rWrites := flag.Int("write-requests", 50000, "number of writes")
  43:     cWrites := flag.Int("concurrent-writes", 500, "number of concurrent writes")
  45:     watches := flag.Int("watches", 500, "number of writes")
  47:     flag.Parse()
  49:     for i := 0; i < *watches; i++ {
  50:         key := strconv.Itoa(i)
  51:         go watch(*endpoint, key)
  52:     }
  54:     wChan := make(chan int, *cWrites)
  55:     for i := 0; i < *cWrites; i++ {
  56:         go write(*endpoint, (*rWrites / *cWrites), wChan)
  57:     }
  59:     for i := 0; i < *cWrites; i++ {
  60:         <-wChan
  61:         log.Printf("Completed %d writes", (*rWrites / *cWrites))
  62:     }
  63: }

I include the entire file here because it is short, and really quite interesting. This is my first time really reading Go code, so I had to go and read some docs. The first interesting thing is in line 11, when when have “end chan int”, which defines a channel of integers. This allows cross goroutine (for .NET guys, think tasks / TPL, that isn’t actually accurate, but it is close enough) communication, including “waiting” for results.

The write func will write the specified number of requests, then push a number to the channel, signifying that it completed its work. That is really quite nice pattern for doing work, considering that there isn’t a way to await a goroutine.

The watch func is a little hard for me to get. Mostly because as far as I understand, it is setting up a watch for a particular key, then just accumulate the number of changes to it in a local variable, and not doing anything else with it.

The main function is really funny to read. The flag package is fascinating way to handle parameter parsing, and it shows how carefully Go was meant to be a server side language, where command line parsing is really common. The flag package is both powerful and really simple. I think that I’ll probably make use of this approach for configuration in RavenDB.

I love that you define the flag, and then you get a pointer to where that value will be, once you called flag.Parse();

Note that calls to go [expr] are equivalent for Task.Factory.StartNew([expr]) in .NET (not really, but close enough). The syntax <-wChan, for example, means “wait until there is a new value in the channel”. Presumably it also translate to something like “await channel.DequeueAsync()” in C#.

Next was the config package, where etcd is initializing itself. It was interesting to learn that you can have non global flags using the flag package, so you can use the same code for parsing arguments to a method. But that was about it. Nothing exciting there.

I’m skipping the contrib directory because there is no Go code there, and it doesn’t seems relevant for now. I’ll note that there is a mix of shell scripts, Python and json code in there, so I’m feeling good about ignoring that for now.

One thing that I really like in Go is that it is very easy to define “extension methods”, in fact, you usually appear to define structs (data holders), and then you just define a function that takes this as the base argument, and you can call it using method syntax. That makes some things very natural and nice. And it also give you really nice separation between data & behavior.

I also like the multiple return values option, which gives us a good pattern for reporting errors without getting either crazy syntax or throwing. It also make it clear when we want to ignore errors. Look at this:

   1: func (d *Discoverer) findPeers() (peers []string, err error) {
   2:     resp, err := d.client.Get(path.Join(d.prefix), false, true)
   3:     if err != nil {
   4:         return nil, err
   5:     }
   7:     node := resp.Node
   9:     if node == nil {
  10:         return nil, fmt.Errorf("%s key doesn't exist.", d.prefix)
  11:     }

Trying to do this in C, for example, would lead to an explosion of arrow head return values, or the complexities of “return zero for error, then get the actual issue from GetLatsError()”. Multiple return values results in a much nicer code than that.

The discovery protocol itself is defined in the docs ,but the code implementing it is really nice. Being able to piggy back on etcd itself to implement discovery is really nice.

The fixtures directory seems to be filled with certs files, I am not sure what for, but I’ll go directly to the http directory and see what is going on there. And there doesn’t appear to be anything much, so I’m moving on to the next thing that is actually meaningful.

The metrics section is interesting, mostly because we have been doing the same thing in RavenDB currently. But it all depends on an external package, so I’ll skip this for now. The next interesting thing is in the mod folder, where we have the dashboard (html5 app, not interested for me) module, and the leader & lock modules.

I’ll start with the leader module. Where we actually have interesting things. The leader module is actually very literally just proxying stuff to the lock module. It is getting a request, translating that request to a lock module http request, and execute that. Personally, I wouldn’t bother with doing this server side, and handle this entirely client side, or by calling the lock module methods directly, instead of proxying the request to the lock module. I am not sure why this approach was choosen:

   1: // getHandler retrieves the current leader.
   2: func (h *handler) getHandler(w http.ResponseWriter, req *http.Request) error {
   3:     vars := mux.Vars(req)
   5:     // Proxy the request to the lock service.
   6:     url := fmt.Sprintf("%s/mod/v2/lock/%s?field=value", h.addr, vars["key"])
   7:     resp, err := h.client.Get(url)
   8:     if err != nil {
   9:         return err
  10:     }
  11:     defer resp.Body.Close()
  13:     w.WriteHeader(resp.StatusCode)
  14:     io.Copy(w, resp.Body)
  15:     return nil
  16: }

The lock module is where real stuff is happening. And at the same time, I am not sure at what level exactly this is happening. What appears to be happening is that the lock module, too, is built directly on top of the etcd client, rather than using it directly. This is strange to me, because that isn’t the way I would architect it, but I am guessing that this make it easier to work with things, having only a single real external API. On the other hand, having a server make http requests to itself seems very strange to me.

One thing that really confused me was a lot of references to things that are actually defined in another repository, the client side of etcd in go. Another interesting thing is the way Go implements interfaces. Instead of using explicit interfaces, if a type has all the methods for an interface, it is implementing that interface.

At this point I decided that I wanted a better IDE and spent some time getting IntelliJ to work with Go. It supports this, and you even get some reference tracking. I couldn’t get all of it to work, in particular, external reference weren’t tracked, and I didn’t really care to see why, so I just left it:


At any rate, I was reading the lock module code. In particular, I am not tracking the acquire_handler.go file. It has a major function (acquireHandler)* that actually handle the process of acquiring the lock.

* Sidenote, I like the structure of the code so far, in most files, we have one function, and some supporting functions to help it do some work. It is nice, simple and quite easy to follow.

The first thing that is done is syncing the cluster information. This is done by going to any of the machines that we already know about and asking them about the current state of the cluster. We take the first response, and presumably, since we are running server side, the first response would always be from us (assuming that the requests end up going to the leader). So there isn’t another machine boundary request, but it is still very strange to read it going through so much client operations.

This code is really interesting:

   2:     // Setup connection watcher.
   3:     closeNotifier, _ := w.(http.CloseNotifier)
   4:     closeChan := closeNotifier.CloseNotify()
   5:     stopChan := make(chan bool)

In C#, that would be setting up a cancellation token for the request being abandoned by the client or we completing some work.

   1: // If node exists then just watch it. Otherwise create the node and watch it.           
   2: node, index, pos := h.findExistingNode(keypath, value)                                  
   3: if index > 0 {                                                                          
   4:     if pos == 0 {                                                                          
   5:         // If lock is already acquired then update the TTL.                                   
   6:         h.client.Update(node.Key, node.Value, uint64(ttl))                                    
   7:     } else {                                                                               
   8:         // Otherwise watch until it becomes acquired (or errors).                             
   9:         err = h.watch(keypath, index, nil)                                                    
  10:     }                                                                                      
  11: } else {                                                                                
  12:     index, err = h.createNode(keypath, value, ttl, closeChan, stopChan)                    
  13: }                                                                                       

This is interesting, I am not really able to follow what is going on in the happen case (index > 0) yet. Let us lock at what happens with createNode…

   1: // createNode creates a new lock node and watches it until it is acquired or acquisition fails.
   2: func (h *handler) createNode(keypath string, value string, ttl int, closeChan <-chan bool, stopChan chan bool) (int, error) {
   3:     // Default the value to "-" if it is blank.
   4:     if len(value) == 0 {
   5:         value = "-"
   6:     }
   8:     // Create an incrementing id for the lock.
   9:     resp, err := h.client.AddChild(keypath, value, uint64(ttl))
  10:     if err != nil {
  11:         return 0, err
  12:     }
  13:     indexpath := resp.Node.Key
  14:     index, _ := strconv.Atoi(path.Base(indexpath))
  16:     // Keep updating TTL to make sure lock request is not expired before acquisition.
  17:     go h.ttlKeepAlive(indexpath, value, ttl, stopChan)
  19:     // Watch until we acquire or fail.
  20:     err = h.watch(keypath, index, closeChan)
  22:     // Check for connection disconnect before we write the lock index.
  23:     if err != nil {
  24:         select {
  25:         case <-closeChan:
  26:             err = errors.New("user interrupted")
  27:         default:
  28:         }
  29:     }
  31:     // Update TTL one last time if acquired. Otherwise delete.
  32:     if err == nil {
  33:         h.client.Update(indexpath, value, uint64(ttl))
  34:     } else {
  35:         h.client.Delete(indexpath, false)
  36:     }
  38:     return index, err
  39: }

In line 9, we create a child of the key path. Assuming that the key path is foo, this will create an item with foo/1, foo/2, etc. Effectively an auto incrementing number (with no guarantees on the size of the step, mind).

In line 17 we make sure that we keep this alive, the ttlKeepAlive function is really fun to read:

   1: // ttlKeepAlive continues to update a key's TTL until the stop channel is closed.
   2: func (h *handler) ttlKeepAlive(k string, value string, ttl int, stopChan chan bool) {
   3:     for {
   4:         select {
   5:         case <-time.After(time.Duration(ttl/2) * time.Second):
   6:             h.client.Update(k, value, uint64(ttl))
   7:         case <-stopChan:
   8:             return
   9:         }
  10:     }
  11: }

The C# translation for this would be:

   1: public async Task TtlKeepAlive(string k, string value, int ttl, CancelationToken t)
   2: {
   3:     while(true)
   4:     {
   5:         await Task.Delay(ttl, t);
   6:         if(t.IsCancelled)
   7:           return;
   8:         client.Update(k,value, ttl);
  10:     }
  11: }

But I really like this Go select syntax. It is very much like Erlang’s pattern matching on receive. At any rate, it seems that the magic happens in the watch method.

   1: // watch continuously waits for a given lock index to be acquired or until lock fails.
   2: // Returns a boolean indicating success.
   3: func (h *handler) watch(keypath string, index int, closeChan <-chan bool) error {
   4:     // Wrap close chan so we can pass it to Client.Watch().
   5:     stopWatchChan := make(chan bool)
   6:     stopWrapChan := make(chan bool)
   7:     go func() {
   8:         select {
   9:         case <-closeChan:
  10:             stopWatchChan <- true
  11:         case <- stopWrapChan:
  12:             stopWatchChan <- true
  13:         case <- stopWatchChan:
  14:         }
  15:     }()
  16:     defer close(stopWrapChan)
  18:     for {
  19:         // Read all nodes for the lock.
  20:         resp, err := h.client.Get(keypath, true, true)
  21:         if err != nil {
  22:             return fmt.Errorf("lock watch lookup error: %s", err.Error())
  23:         }
  24:         nodes := lockNodes{resp.Node.Nodes}
  25:         prevIndex := nodes.PrevIndex(index)
  27:         // If there is no previous index then we have the lock.
  28:         if prevIndex == 0 {
  29:             return nil
  30:         }
  32:         // Watch previous index until it's gone.
  33:         waitIndex := resp.Node.ModifiedIndex
  35:         // Since event store has only 1000 histories we should use first node's CreatedIndex if available
  36:         if firstNode := nodes.First(); firstNode != nil {
  37:             waitIndex = firstNode.CreatedIndex
  38:         }
  40:         _, err = h.client.Watch(path.Join(keypath, strconv.Itoa(prevIndex)), waitIndex, false, nil, stopWatchChan)
  41:         if err == etcd.ErrWatchStoppedByUser {
  42:             return fmt.Errorf("lock watch closed")
  43:         } else if err != nil {
  44:             return fmt.Errorf("lock watch error: %s", err.Error())
  45:         }
  46:     }
  47: }

I’ve to admit, this makes my head hurt, just a little bit.

But first, the defer syntax Go has is really nice. It is very similar to C#’s using statements, but it isn’t limited to just a specific interface, and it doesn’t introduce a nesting block.

To go routine in line 7 is interesting. It will wait for a notification from the close channel, the stop watch channel or the wrap channel. And it will forward all of those to the stop watch channel. I’m really not sure why this is the case, but let’s go with this for now.

The real interesting work happens in the for loop. We get all the keys in the specified key path. Note that we assume that we only have numeric keys in that “directory”. And we basically try to find if there is any value that is before our value.

The easiest way to think about it is in the same way you do when you wait in line in any government queue. You take a number, and you’re the first is there is no one with an earlier number than you.

The interesting bit is how Watch is handled. It is basically going to do a long poll request from the server, and the stopWatchChan is used to notify the Watch method when the user cancelled the request, so we don’t need this any longer. I’m really not sure why there is a need for stopWrapChan, but… at least now I understand what is going on here. We use the numbering system to effectively join a queue. Then we wait until we are at the head of the queue.

Let us go back to the actual acuireHandler routine, more specifically to the findExistingNode() behavior. If we specified a value, we try to find an existing entry in the path that already have this value. If there isn’t a value, we go back to the “take a number, wait” approach. If there is a value, however, I don’t following the logic. The findExistingNode() has three return values. The relevant node with the value, the index (the queue #, effectively), and the position of the specified node in the queue.

My problem is that I don’t understand the logic here. We find a node with the same value as we want, then we check that it is the first in the queue? What happens when we have two clients issuing the same request at the same time? I understand what happens, I don’t understand what the intention is.

As an aside, I think that I understand why a lot of the internal works in the lock module is done over the HTTP layer. The idea here is to only handle the distribution once. And if you route everything through the http interface, that would be it. This way, you don’t have to worry about how to handle consistencies, or stuff like that. And the idea is that you have a simple HTTP interface for a complex system like locking. My own preference would be to do this entirely client side, with no server side behavior, but that puts a lot of the onus on the clients, and it is easier to implement server side if you have a lot of clients for many environments.

Anyway, I think that I found the two pieces that really interests me:



Store is probably the on disk storage, something that is very near & dear to my heart. While server is the pieces that I’ll probably learn the most from…

I’ll start with going over the storage stuff, since that is probably the most familiar to me. Here is the interface for the store:

   1: type Store interface {
   2:     Version() int
   3:     CommandFactory() CommandFactory
   4:     Index() uint64
   6:     Get(nodePath string, recursive, sorted bool) (*Event, error)
   7:     Set(nodePath string, dir bool, value string, expireTime time.Time) (*Event, error)
   8:     Update(nodePath string, newValue string, expireTime time.Time) (*Event, error)
   9:     Create(nodePath string, dir bool, value string, unique bool,
  10:         expireTime time.Time) (*Event, error)
  11:     CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
  12:         value string, expireTime time.Time) (*Event, error)
  13:     Delete(nodePath string, recursive, dir bool) (*Event, error)
  14:     CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error)
  16:     Watch(prefix string, recursive, stream bool, sinceIndex uint64) (*Watcher, error)
  18:     Save() ([]byte, error)
  19:     Recovery(state []byte) error
  21:     TotalTransactions() uint64
  22:     JsonStats() []byte
  23:     DeleteExpiredKeys(cutoff time.Time)
  24: }

This is really interesting, because from the interface alone you can see some really interesting things. For example, The Save() method. That doesn’t match any transactional store interface that I can think of. To be fair, it looks to be more in the sense that this is used for snapshots than anything else, but still..

Those appear to be the core elements of the store:

   1: type store struct {                                                     
   2:     Root           *node                                                   
   3:     WatcherHub     *watcherHub                                             
   4:     CurrentIndex   uint64                                                  
   5:     Stats          *Stats                                                  
   6:     CurrentVersion int                                                     
   7:     ttlKeyHeap     *ttlKeyHeap  // need to recovery manually               
   8:     worldLock      sync.RWMutex // stop the world lock                     
   9: }                                                                       
  12: // node is the basic element in the store system.
  13: // A key-value pair will have a string value
  14: // A directory will have a children map
  15: type node struct {
  16:     Path string
  18:     CreatedIndex  uint64
  19:     ModifiedIndex uint64
  21:     Parent *node `json:"-"` // should not encode this field! avoid circular dependency.
  23:     ExpireTime time.Time
  24:     ACL        string
  25:     Value      string           // for key-value pair
  26:     Children   map[string]*node // for directory
  28:     // A reference to the store this node is attached to.
  29:     store *store
  30: }

Again, the ability to return multiple values is really nice, see methods such as:

   1: // Read function gets the value of the node.
   2: // If the receiver node is not a key-value pair, a "Not A File" error will be returned.
   3: func (n *node) Read() (string, *etcdErr.Error) {
   4:     if n.IsDir() {
   5:         return "", etcdErr.NewError(etcdErr.EcodeNotFile, "", n.store.Index())
   6:     }
   8:     return n.Value, nil
   9: }

This is quite lovely way to handle things.

However, looking at the Store directory, I am seeing a lot of stuff about modifying the in memory state, but nothing about persistence. I think that this is all handled via Raft. So I’ll just move away into reading the server side code now.

This is the Server interface:

   1: type Server interface {
   2:     State() string
   3:     Leader() string
   4:     CommitIndex() uint64
   5:     Term() uint64
   6:     PeerURL(string) (string, bool)
   7:     ClientURL(string) (string, bool)
   8:     Store() store.Store
   9:     Dispatch(raft.Command, http.ResponseWriter, *http.Request) error
  10: }

And then we have actually handling requests. I choose to look at the simplest thing, just looking at how we process a read request:

   1: func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
   2:     vars := mux.Vars(req)
   3:     key := "/" + vars["key"]
   5:     // Help client to redirect the request to the current leader
   6:     if req.FormValue("consistent") == "true" && s.State() != raft.Leader {
   7:         leader := s.Leader()
   8:         hostname, _ := s.ClientURL(leader)
  10:         url, err := url.Parse(hostname)
  11:         if err != nil {
  12:             log.Warn("Redirect cannot parse hostName ", hostname)
  13:             return err
  14:         }
  15:         url.RawQuery = req.URL.RawQuery
  16:         url.Path = req.URL.Path
  18:         log.Debugf("Redirect consistent get to %s", url.String())
  19:         http.Redirect(w, req, url.String(), http.StatusTemporaryRedirect)
  20:         return nil
  21:     }
  23:     recursive := (req.FormValue("recursive") == "true")
  24:     sort := (req.FormValue("sorted") == "true")
  25:     waitIndex := req.FormValue("waitIndex")
  26:     stream := (req.FormValue("stream") == "true")
  28:     if req.FormValue("wait") == "true" {
  29:         return handleWatch(key, recursive, stream, waitIndex, w, s)
  30:     }
  32:     return handleGet(key, recursive, sort, w, s)
  33: }

If we are requiring consistency, and we aren’t the leader, we’ll forward to the leader. Otherwise, we’ll process the request. Let us assume for now that we are doing a simple get, not a watch, this gives us:

   1: func handleGet(key string, recursive, sort bool, w http.ResponseWriter, s Server) error {
   2:     event, err := s.Store().Get(key, recursive, sort)
   3:     if err != nil {
   4:         return err
   5:     }
   7:     writeHeaders(w, s)
   8:     b, _ := json.Marshal(event)
   9:     w.Write(b)
  10:     return nil
  11: }
  13: func writeHeaders(w http.ResponseWriter, s Server) {
  14:     w.Header().Set("Content-Type", "application/json")
  15:     w.Header().Add("X-Etcd-Index", fmt.Sprint(s.Store().Index()))
  16:     w.Header().Add("X-Raft-Index", fmt.Sprint(s.CommitIndex()))
  17:     w.Header().Add("X-Raft-Term", fmt.Sprint(s.Term()))
  18:     w.WriteHeader(http.StatusOK)
  19: }

As you can see, we are basically just getting the current state from the in memory store, and handle that. It would be more interesting to look at how we handle waiting for a value to change, however:

   1: func handleWatch(key string, recursive, stream bool, waitIndex string, w http.ResponseWriter, s Server) error {
   2:     // Create a command to watch from a given index (default 0).
   3:     var sinceIndex uint64 = 0
   4:     var err error
   6:     if waitIndex != "" {
   7:         sinceIndex, err = strconv.ParseUint(waitIndex, 10, 64)
   8:         if err != nil {
   9:             return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", s.Store().Index())
  10:         }
  11:     }
  13:     watcher, err := s.Store().Watch(key, recursive, stream, sinceIndex)
  14:     if err != nil {
  15:         return err
  16:     }
  18:     cn, _ := w.(http.CloseNotifier)
  19:     closeChan := cn.CloseNotify()
  21:     writeHeaders(w, s)
  23:     if stream {
  24:         // watcher hub will not help to remove stream watcher
  25:         // so we need to remove here
  26:         defer watcher.Remove()
  27:         for {
  28:             select {
  29:             case <-closeChan:
  30:                 return nil
  31:             case event, ok := <-watcher.EventChan:
  32:                 if !ok {
  33:                     // If the channel is closed this may be an indication of
  34:                     // that notifications are much more than we are able to
  35:                     // send to the client in time. Then we simply end streaming.
  36:                     return nil
  37:                 }
  39:                 b, _ := json.Marshal(event)
  40:                 _, err := w.Write(b)
  41:                 if err != nil {
  42:                     return nil
  43:                 }
  44:                 w.(http.Flusher).Flush()
  45:             }
  46:         }
  47:     }
  49:     select {
  50:     case <-closeChan:
  51:         watcher.Remove()
  52:     case event := <-watcher.EventChan:
  53:         b, _ := json.Marshal(event)
  54:         w.Write(b)
  55:     }
  56:     return nil
  57: }

The store’s Watch() method is actually interesting, because it exposes some interesting Go concepts (handling full channels, channels for communications, etc). But the important thing is that this will simply wait for a change to happen in the in memory state, and when such a thing happens, it will put a value in the wathcer.EventChan channel. So the logic here goes like this:

  • Setup a watch on the in memory state.
  • Wait for a:
    • Change in the items we watch
    • Or user abandoning the request

There is some interesting stuff here regarding one time watch, or streaming watch, but that appears to be quite easy to figure out and follow what is going on.

One thing that I can tell you, from my own experience, is that I would actually expect this to have serious issues in productions. In particular, web servers can decide that this request takes too long, and just abort it (for example IIS behaves in this manner), or that it timed out. That obviously depends on the server side implementation, and I’m willing to assume that this isn’t the case for whatever http stack etcd uses. However, clients do not. Most clients would give you the benefit of the doubt, but they would abort the request after a while, usually 15 seconds. That might be okay for some purposes, but especially if you want to handling streaming, that isn’t really going to cut it.

More to the point, for long requests, this can cause issues for proxies, firewalls, etc. They’ll decide that the request is closed, and shut it down even if you handled it on both ends properly. With RavenDB, we have a remarkably similar system, but our streaming notifications also incorporate the idea of heartbeat messages. Those are sent every now and then strictly in order to make sure that you’ll get something client side, and that will make all the infrastructure, client side code, etc much much happier.

Enough with the small stuff, let us look at how we handle more complex things. I now intend to take a look at the POST handler. POST operations in etcd has the following format:

curl -XPOST -d value=Job1

The idea is that this will create an automatically named key such as queue/15, queue/853, etc. The POST handler is interesting, because here it is in its entirety.

   2: func PostHandler(w http.ResponseWriter, req *http.Request, s Server) error {
   3:     vars := mux.Vars(req)
   4:     key := "/" + vars["key"]
   6:     value := req.FormValue("value")
   7:     dir := (req.FormValue("dir") == "true")
   8:     expireTime, err := store.TTL(req.FormValue("ttl"))
   9:     if err != nil {
  10:         return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", s.Store().Index())
  11:     }
  13:     c := s.Store().CommandFactory().CreateCreateCommand(key, dir, value, expireTime, true)
  14:     return s.Dispatch(c, w, req)
  15: }

The CreateCreateCommand just create a data object that holds the parameters, and then we dispatch it. I’m thinking that we can learn quite a lot from how that works.

Dispatch merely send the command to the leader. This is the relevant code if we are not the leader:

   1: leader := ps.raftServer.Leader()                                         
   3: // No leader available.                                                  
   4: if leader == "" {                                                        
   5:     return etcdErr.NewError(300, "", s.Store().Index())                     
   6: }                                                                        
   8: var url string                                                           
   9: switch c.(type) {                                                        
  10: case *JoinCommand, *RemoveCommand:                                       
  11:     url, _ = ps.registry.PeerURL(leader)                                    
  12: default:                                                                 
  13:     url, _ = ps.registry.ClientURL(leader)                                  
  14: }                                                                        
  15: uhttp.Redirect(url, w, req)                                              
  17: return nil                                                               

So basically, if there isn’t a leader, we error. That can happen if we have a network split and we are in the minority portion, for example. But usually we’ll just redirect you to the right server to use. But here is the interesting part, where we are the leader, and get to do stuff:

   1: result, err := ps.raftServer.Do(c)
   2: if err != nil {
   3:     return err
   4: }
   6: if result == nil {
   7:     return etcdErr.NewError(300, "Empty result from raft", s.Store().Index())
   8: }
  10: // response for raft related commands[join/remove]
  11: if b, ok := result.([]byte); ok {
  12:     w.WriteHeader(http.StatusOK)
  13:     w.Write(b)
  14:     return nil
  15: }
  17: var b []byte
  18: if strings.HasPrefix(req.URL.Path, "/v1") {
  19:     b, _ = json.Marshal(result.(*store.Event).Response(0))
  20:     w.WriteHeader(http.StatusOK)
  21: } else {
  22:     e, _ := result.(*store.Event)
  23:     b, _ = json.Marshal(e)
  25:     w.Header().Set("Content-Type", "application/json")
  26:     // etcd index should be the same as the event index
  27:     // which is also the last modified index of the node
  28:     w.Header().Add("X-Etcd-Index", fmt.Sprint(e.Index()))
  29:     w.Header().Add("X-Raft-Index", fmt.Sprint(s.CommitIndex()))
  30:     w.Header().Add("X-Raft-Term", fmt.Sprint(s.Term()))
  32:     if e.IsCreated() {
  33:         w.WriteHeader(http.StatusCreated)
  34:     } else {
  35:         w.WriteHeader(http.StatusOK)
  36:     }
  37: }
  39: w.Write(b)
  41: return nil

The ps variable is something called a PeerServer, and I haven’t check it yet. But all of this code is basically doing is: “send this to raft to Do something about it, then reply to the caller”. So let us look at what we are actually doing there. The Do method merely call the send() method, which looks like this:

   1: // Sends an event to the event loop to be processed. The function will wait
   2: // until the event is actually processed before returning.
   3: func (s *server) send(value interface{}) (interface{}, error) {
   4:     event := &ev{target: value, c: make(chan error, 1)}
   5:     s.c <- event
   6:     err := <-event.c
   7:     return event.returnValue, err
   8: }

Personally, I think that this is very interesting, and again, very much like the way you would structure an Erlang system. Of particular interest is the idea of event loop. That would be the s.c channel, and I assume that this is meant for a separate goroutine that is processing work on top of that. We ended up with transaction merging in Voron using pretty much the same system.

The s.c channel is a channel of ev pointers. And ev is defined as:

   2: // An internal event to be processed by the server's event loop.
   3: type ev struct {
   4:     target      interface{}
   5:     returnValue interface{}
   6:     c           chan error
   7: }

The interface{} definition it Go’s System.Object, basically. And the error channel is there to mark when we are done processing the event, I assume. I would structure it so we can send a null error as completion, and I bet that this is how this is done.

I’m currently assuming that this is being read from the loop() method. And while I usually don’t just comment on comments, this one is really nice:

   2: //--------------------------------------
   3: // Event Loop
   4: //--------------------------------------
   6: //               ________
   7: //            --|Snapshot|                 timeout
   8: //            |  --------                  ______
   9: // recover    |       ^                   |      |
  10: // snapshot / |       |snapshot           |      |
  11: // higher     |       |                   v      |     recv majority votes
  12: // term       |    --------    timeout    -----------                        -----------
  13: //            |-> |Follower| ----------> | Candidate |--------------------> |  Leader   |
  14: //                 --------               -----------                        -----------
  15: //                    ^          higher term/ |                         higher term |
  16: //                    |            new leader |                                     |
  17: //                    |_______________________|____________________________________ |

This comment promises an interesting function to read…

   2: func (s *server) loop() {
   3:     defer s.debugln("server.loop.end")
   5:     for {
   6:         state := s.State()
   8:         s.debugln("server.loop.run ", state)
   9:         switch state {
  10:         case Follower:
  11:             s.followerLoop()
  13:         case Candidate:
  14:             s.candidateLoop()
  16:         case Leader:
  17:             s.leaderLoop()
  19:         case Snapshotting:
  20:             s.snapshotLoop()
  22:         case Stopped:
  23:             s.stopped <- true
  24:             return
  25:         }
  26:     }
  27: }

Let us start by looking at the leaderLoop behavior:

   1: func (s *server) leaderLoop() {
   2:     s.setState(Leader)
   3:     logIndex, _ := s.log.lastInfo()
   5:     // Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat.
   6:     s.debugln("leaderLoop.set.PrevIndex to ", logIndex)
   7:     for _, peer := range s.peers {
   8:         peer.setPrevLogIndex(logIndex)
   9:         peer.startHeartbeat()
  10:     }
  12:     // Commit a NOP after the server becomes leader. From the Raft paper:
  13:     // "Upon election: send initial empty AppendEntries RPCs (heartbeat) to
  14:     // each server; repeat during idle periods to prevent election timeouts
  15:     // (§5.2)". The heartbeats started above do the "idle" period work.
  16:     go s.Do(NOPCommand{})
  18:     // Begin to collect response from followers
  19:     for s.State() == Leader {
  20:         var err error
  21:         select {
  22:         case e := <-s.c:
  23:             if e.target == &stopValue {
  24:                 // Stop all peers before stop
  25:                 for _, peer := range s.peers {
  26:                     peer.stopHeartbeat(false)
  27:                 }
  28:                 s.setState(Stopped)
  29:             } else {
  30:                 switch req := e.target.(type) {
  31:                 case Command:
  32:                     s.processCommand(req, e)
  33:                     continue
  34:                 case *AppendEntriesRequest:
  35:                     e.returnValue, _ = s.processAppendEntriesRequest(req)
  36:                 case *AppendEntriesResponse:
  37:                     s.processAppendEntriesResponse(req)
  38:                 case *RequestVoteRequest:
  39:                     e.returnValue, _ = s.processRequestVoteRequest(req)
  40:                 }
  41:             }
  43:             // Callback to event.
  44:             e.c <- err
  45:         }
  46:     }
  48:     s.syncedPeer = nil
  49: }

To be perfectly frank, this is really code code. I am loving the structure here. It is really fun to go through and figure out. And the code follows really closely the Raft paper. And… it appears that at some point I actually moved off the etcd code and into the go-raft codebase. I think that I’ll skip doing the Raft stuff for this blog post. It is long enough already, and just focus on the etcd stuff for now.

The part that we really care for this blog post about is the processCommand call:

   1: // Processes a command.
   2: func (s *server) processCommand(command Command, e *ev) {
   3:     s.debugln("server.command.process")
   5:     // Create an entry for the command in the log.
   6:     entry, err := s.log.createEntry(s.currentTerm, command, e)
   8:     if err != nil {
   9:         s.debugln("server.command.log.entry.error:", err)
  10:         e.c <- err
  11:         return
  12:     }
  14:     if err := s.log.appendEntry(entry); err != nil {
  15:         s.debugln("server.command.log.error:", err)
  16:         e.c <- err
  17:         return
  18:     }
  20:     s.syncedPeer[s.Name()] = true
  21:     if len(s.peers) == 0 {
  22:         commitIndex := s.log.currentIndex()
  23:         s.log.setCommitIndex(commitIndex)
  24:         s.debugln("commit index ", commitIndex)
  25:     }
  26: }

In createEntry, we create effectively serialize the command into JSON, and appenEntry writes it to a file. (I finally found the serialize format, it is JSON for the commands, wrapped in a protobuf envelop). As an aside, if this was a C# code, I would be very worried about the cost of all those allocations. The data is first moved to a JSON buffer, then into a protocol buffer entry, where it is marshaled into another buffer, and only then is it written to the actual file. That is pretty prevalent in the codebase, to be honest. But again, this is Raft stuff that is going on, not etcd stuff. So we’ll ignore this for now and try to see where we actually get to apply the command against our own internal state.

I had to go through some hoops to figure it out. In particular, commands are applied during recovery, or when we are actually committing the state following a quorum, and this is happening in the Log.ApplyFunc, which is setup externally, and… Anyway, what we actually do is this:

   1: // Apply command to the state machine.                                 
   2: switch c := c.(type) {                                                 
   3: case CommandApply:                                                     
   4:     return c.Apply(&context{                                              
   5:         server:       s,                                                     
   6:         currentTerm:  s.currentTerm,                                         
   7:         currentIndex: s.log.internalCurrentIndex(),                          
   8:         commitIndex:  s.log.commitIndex,                                     
   9:     })                                                                    
  10: case deprecatedCommandApply:                                           
  11:     return c.Apply(s)                                                     
  12: default:                                                               
  13:     return nil, fmt.Errorf("Command does not implement Apply()")          
  14: }                                                                      

And that goes all the way back to the CreateCommand’s Apply function, which does:

   1: // Create node                                                                                      
   2: func (c *CreateCommand) Apply(context raft.Context) (interface{}, error) {                          
   3:     s, _ := context.Server().StateMachine().(store.Store)                                              
   5:     e, err := s.Create(c.Key, c.Dir, c.Value, c.Unique, c.ExpireTime)                                  
   7:     if err != nil {                                                                                    
   8:         log.Debug(err)                                                                                    
   9:         return nil, err                                                                                   
  10:     }                                                                                                  
  12:     return e, nil                                                                                      
  13: }                                                                                                   

So, basically, we have Raft that does the hard work of getting a Quorum, persistence, etc.  The etcd server is responsible for the in memory state, defining commands, etc.

The really interesting part from my perspective is that we need to process erroneous entries as well, in the same manner. For example, let us say that I want to create a new entry, but only if it isn’t already there. The way it works, even though I know that this would be an error, I have to run this through Raft, get a consensus that we can apply this command, and then we apply the command, see that it is wrong, and return an error. That error leaves no state changes, but it still had to go through the Raft process, it is going to be in the log forever, etc. I’m guessing that the percentage of erroneous commands is low, to be able to tolerate that.

And, at any rate. That pretty much conclude my review of etcd. It comes to about 20 pages or so, according to my math, and that is quite enough. On the other hand, it might have been 7 posts, instead. I would really like to get some feedback on which option you like more, dear reader.

Next, I’m going to go over go-raft, I have some thoughts about this, but I’ll keep them for my next post.

As a side note. I am not, by any means, an experience Go developer. I haven’t even read Go code beyond Hello World before starting reading the etcd codebase. But I can tell you that this is a very nice codebase to look at. It is clear, nicely laid out, it is possible to go through everything and understand what is going on easily.

Distributed counters feature design

This is another experiment with longer posts.

Previously, I used the time series example as the bed on which to test some ideas regarding feature design, to explain how we work and in general work out the rough patches along the way. I should probably note that these posts are purely fiction at this point. We have no plans to include a time series feature in RavenDB at this time. I am trying to work out some thoughts in the open and get your feedback.

At any rate, yesterday we had a request for Cassandra style counters at the mailing list. And as long as I am doing feature design series, I thought that I could talk about how I would go about implementing this. Again, consider this fiction, I have no plans of implementing this at this time.

The essence of what we want is to be able to… count stuff. Efficiently, in a distributed manner, with optional support for cross data center replication.

Very roughly, the idea is to have “sub counters”, unique for every node in the system. Whenever you increment the value, we log this to our own sub counter, and then replicate it out. Whenever you read it, we just sum all the data we have from all the sub counters.

Let us outline the various parts of the solution in the same order as the one I used for time series.


A counter is just a named 64 bits signed integer. A counter name can be any string up to 128 printable characters. The external interface of the storage would look like this:

   1: public struct CounterIncrement
   2: {
   3:     public string Name;
   4:     public long Change;
   5: }
   7: public struct Counter
   8: {
   9:     public string Name;
  10:     public string Source;
  11:     public long Value;
  12: }
  14: public interface ICounterStorage
  15: {
  16:     void LocalIncrementBatch(CounterIncrement[] batch);
  18:     Counter[] Read(string name);
  20:     void ReplicatedUpdates(Counter[] updates);
  21: }

As you can see, this gives us very simple interface for the storage. We can either change the data locally (which modify our own storage) or we can get an update from a replica about its changes.

There really isn’t much more to it, to be fair. The LocalIncrementBatch() increment a local value, and Read() will return all the values for a counter. There is a little bit of trickery involved in how exactly one would store the counter values.

For now, I think we’ll store each counter as two step values. We’ll have a tree of multi tree values that will carry each value from each source. That means that a counter will take roughly 4KB or so. This is easy to work with and nicely fit the model Voron uses internally.

Note that we’ll outline additional requirement for storage (searching for counter by prefix, iterating over counters, addresses of other servers, stats, etc) below. I’m not showing them here because they aren’t the major issue yet.

Over the wire

Skipping out on any optimizations that might be required, we will expose the following endpoints:

  • GET /counters/read?id=users/1/visits&users/1/posts <—will return json response with all the relevant values (already summed up).
    { “users/1/visits”: 43, “users/1/posts”: 3 }
  • GET /counters/read?id=users/1/visits&users/1/1/posts&raw=true <—will return json response with all the relevant values, per source.
    { “users/1/visits”: {“rvn1”: 21, “rvn2”: 22 } , “users/1/posts”:  { “rvn1”: 2, “rvn3”: 1 } }
  • POST /counters/increment <– allows to increment counters. The request is a json array of the counter name and the change.

For a real system, you’ll probably need a lot more stuff, metrics, stats, etc. But this is the high level design, so this would be enough.

Note that we are skipping the high performance stream based writes we outlined for time series. We’ll probably won’t need them, so that doesn’t matter, but they are an option if we need them.

System behavior

This is where it is really not interesting, there is very little behavior here, actually. We only have to read the data from the storage, sum it up, and send it to the user. Hardly what I’ll call business logic.

Client API

The client API will probably look something like this:

   1: counters.Increment("users/1/posts");
   2: counters.Increment("users/1/visits", 4);
   4: using(var batch = counters.Batch())
   5: {
   6:     batch.Increment("users/1/posts");
   7:     batch.Increment("users/1/visits",5);
   8:     batch.Submit();
   9: }

Note that we’re offering both batch and single API. We’ll likely also want to offer a fire & forget style, which will be able to offer even better performance (because they could do batching across more than a single thread), but that is out of scope for now.

For simplicity sake, we are going to have the client just a container for all of endpoints that it knows about. The container would be responsible for… updating the client visible topology, selecting the best server to use at any given point, etc.

User interface

There isn’t much to it. Just show a list of counter values in a list. Allow to search by prefix, allow to dive into a particular counter and read its raw values, but that is about it. Oh, and allow to delete a counter.

Deleting data

Honestly, I really hate deletes. They are very expensive to handle properly the moment you have more than a single node. In this case, there is an inherent race condition between a delete going out and another node getting an increment. And then there is the issue of what happens if you had a node down when you did the delete, etc.

This just sucks. Deletion are handled normally, (with the race condition caveat, obviously), and I’ll discuss how we replicate them in a bit.

High availability / scale out

By definition, we actually don’t want to have storage replication here. Either log shipping or consensus based. We actually do want to have different values, because we are going to be modifying things independently on many servers.

That means that we need to do replication at the database level. And that leads to some interesting questions. Again, the hard part here is the deletes. Actually, the really hard part is what we are going to do with the New Server Problem.

The New Server Problem dictates how we are going to bring a new server into the cluster. If we could fix the size of the cluster, that would make things a lot easier. However, we are actually interested in being able to dynamically grow the cluster size.

Therefor, there are only two real ways to do it:

  • Add a new empty node to the cluster, and have it be filled from all the other servers.
  • Add a new node by backing up an existing node, and restoring as a new node.

RavenDB, for example, follows the first option. But it means that in needs to track a lot more information. The second option is actually a lot simpler, because we don’t need to care about keeping around old data.

However, this means that the process of bringing up a new server would now be:

  1. Update all nodes in the cluster with the new node address (node isn’t up yet, replication to it will fail and be queued).
  2. Backup an existing node and restore at the new node.
  3. Start the new node.

The order of steps is quite important. And it would be easy to get it wrong. Also, on large systems, backup & restore can take a long time. Operationally speaking, I would much rather just be able to do something like, bring a new node into the cluster in “silent” mode. That is, it would get information from all the other nodes, and I can “flip the switch” and make it visible to clients at any point in time.  That is how you do it with RavenDB, and it is an incredibly powerful system, when used properly.

That means that for all intents and purposes, we don’t do real deletes. What we’ll actually do is replace the counter value with delete marker. This turns deletes into a much simple “just another write”. It has the sad implication of not free disk space on deletes, but deletes tend to be rare, and it is usually fine to add a “purge” admin option that can be run on as needed basis.

But that brings us to an interesting issue, how do we actually handle replication.

The topology map

To simplify things, we are going to go with one way replication from a node to another. That allows complex topologies like master-master, cluster-cluster, replication chain, etc. But in the end, this is all about a single node replication to another.

The first question to ask is, are we going to replicate just our local changes, or are we going to have to replicate external changes as well? The problem with replicating external changes is that you may have the following topology:


Now, Server A got a value and sent it to Server B. Server B then forwarded it to Server C. However, at that point, we also have a the value from Server A replicated directly to Server C. Which value is it supposed to pick? And what about a scenario where you have more complex topology?

In general, because in this type of system, we can have any node accept writes, and we actually desire this to be the case, we don’t want this behavior. We want to only replicate local data, not all the data.

Of course, that leads to an annoying question, what happens if we have a 3 node cluster, and one node fails catastrophically. We can bring a new node in, and the other two nodes will be able to fill in their values via replication, but what about the node that is down? The data isn’t gone, it is still right there in the other two nodes, but we need a way to pull it out.

Therefor, I think that the best option would be to say that nodes only replicate their local state, except in the case of a new node. A new node will be told the address of an existing node in the cluster, at which point it will:

  • Register itself in all the nodes in the cluster (discoverable from the existing node). This assumes a standard two way replication link between all servers, if this isn’t the case, the operators would have the responsibility to setup the actual replication semantics on their own.
  • New node now starts getting updates from all the nodes in the cluster. It keeps them in a log for now, not doing anything yet.
  • Ask that node for a complete update of all of its current state.
  • When it has all the complete state of the existing node, it replays all of the remembered logs that it didn’t have a chance to apply yet.
  • Then it announces that it is in a valid state to start accepting client connections.

Note that this process is likely to be very sensitive to high data volumes. That is why you’ll usually want to select a backup node to read from, and that decision is an ops decision.

You’ll also want to be able to report extensively on the current status of the node, since this can take a while, and ops will be watching this very closely.

Server Name

A node requires a unique name. We can use guids, but those aren’t readable, so we can use machine name + port, but those can change. Ideally, we can require the user to set us up with a unique name. That is important for readability and for being able to alter see all the values we have in all the nodes. It is important that names are never repeated, so we’ll probably have a guid there anyway, just to be on the safe side.

Actual Replication Semantics

Since we have the New Server Problem down to an automated process, we can choose the drastically simpler model of just having an internal queue per each replication destination. Whenever we make a change, we also make a note of that in the queue for that destination, then we start an async replication process to that server, sending all of our updates there.

It is always safe to overwrite data using replication, because we are overwriting our own data, never anyone else.

And… that is about it, actually. There are probably a lot of details that I am missing / would discover if we were to actually implement this. But I think that this is a pretty good idea about what this feature is about.

Published at

Originally posted at

Time series feature design: Storage replication & the bee’s knees

Being able to handle replication at the storage level is a really nice feature to have. More than that, it is a feature that can be broadly applied. But… a database is a lot more than just storage. And being able to just move the data around between machines is nice, but there are other things we have to take into account.

In particular, when we replicate via storage changes, we don’t have a good way to take actions on changes. Most of the time, that means that we can’t actually rely on internal caches, and would probably have to deal with that somehow in another fashion. But there are usually secondary processing that is done on a node that would have to be accounted for.

For example, let us assume that we had the ability to replicate RavenDB (docs) changes between machines using storage replication. The problem here is that we would be replicating the documents, but not the indexes, and when we do that, we would need to index the changed documents on the destination node. However, that would actually require two data stores, one for the actual documents data, and one for all of the non replicated data (indexing, stats, etc).

In other words, I think that such a database would have to be designed specifically for that scenario.

In addition to that, it would probably be best for the storage replication to also be annotated with information for higher level code. So if you modify this range in the file, you’ll also know that you need to drop the following documents from the cache.

Published at

Originally posted at

Time series feature design: The Consensus has dRafted a decision

So, after reaching the conclusion that replication is going to be hard, I went back to the office and discussed those challenges and was in general pretty annoyed by it. Then Michael made a really interesting suggestion. Why not put it on RAFT?

And once he explained what he meant, I really couldn’t hold my excitement. We now have a major feature for 4.0. But before I get excited about that (we’ll only be able to actually start working on that in a few months, anyway), let us talk about what the actual suggestion was.

Raft is a consensus algorithm. It allows a distributed set of computers to arrive into a mutually agreed upon set of sequential log records. Hm… I wonder where else we can find sequential log records, and yes, I am looking at you Voron.Journal.

The basic idea is that we can take the concept of log shipping, but instead of having a single master/slave relationship, we change things so we can put Raft in the middle. When committing a transaction, we’ll hold off committing the transaction until we have a Raft consensus that it should be committed. The advantage here is that we won’t be constrained any longer by the master/slave issue. If there is a server down, we can still process requests (maybe need to elect a new cluster leader, but that is about it).

That means that from an architectural standpoint, we’ll have the ability to process write requests for any quorum (N/2+1). That is a pretty standard requirement for distributed databases, so that is perfectly fine.

That is a pretty awesome thing to have, to be honest, and more importantly, this is happening at the low level storage layer. That means that we can apply this behavior not just to a single database solution, but to many database solutions.

I’m pretty excited about this.

Time series feature design: Replication

Armed with the knowledge about replication strategies from my previous post, we can now consider this in the context of the time series database.

We actually have two distinct pieces of data that we track. We have the actual time data, the timestamp and value that we keep track of, and we have the series information (tags, mostly).  We can consider using log shipping here, and that would give us a great way to get a read replica. But the question is why we would want to do that. It is nice to get a replica, but that replica would have to be read only. Is that useful?  It could take over, if the master is down, but that would mean that the master would have to stay down (or converted to a slave). And divergent writes are a problem.

While attractive as a cheap way to handle replication, I don’t like this very much.

So that leaves us with using a multi write partners situation. In this case, we can allow the two servers to operate in tandem. We need to have some way to resolve conflicts, and this is where things gets a bit messy.

For series data, it is trivial to just use some form of last write wins. This assumes a synchronized clock between the servers, but we’ll leave that requirement for now.

The problem is with the actual time data. Conceptually, we intend to store the information like this:


The problem is how do you detect conflicts. And are they really even possible. Let us assume that we want to update a particular value at time T on both servers. Server A replicates to server B, and now we need to decide how to deal with it. Ignore the value? Overwrite the value?

The important thing is that we need some predictable way to handle this that will end up with all the nodes in the cluster having the same agreed upon value. The simplest scenario, assuming a clock sync, is to use the write timestamp. But that would require us to keep the write time stamp. Currently we can use just 16 bytes for each time record. But recording the write timestamp will increase our usage to 24 bytes. That is a 50% increase just to handle conflicts. I don’t want to pay that.

The good thing about time series data is that a single value isn’t that important, and the likelihood that they will be changed it relatively small. We can just decide to say: We’ll choose a value, for example, we will choose the maximum value for that time, and be done with it. That has its own set of problems, but we’ll deal with that in a bit. We need to discuss how we deal with replication in general, first.

Let us imagine that we have 3 servers:

  • Server A replicates to B
  • Server B replicates to C
  • Server C replicates to A

We have concurrent writes to the same time value on both server A and B. For the purpose of the discussion, let us assume that we have a way to resolve the conflict.

Server A notifies Server B about the change, but server B already have a different value for that. Conflict resolution is run, and we have a new value .That value need to be replicated down stream. It goes to Server C, who then replicate it to Server A, who then replicates it to Server B? Ad infinitum?

I intentionally chose this example, but the same thing can happen with just two servers replicating to one another (master/master). And the problem here is that in order to be able to actually track this properly, we are going to need to keep a lot of metadata around, per value. While I can sort of accept the need to keep the write time (thus requiring 50% more space per value), the idea of holding many times more metadata for replication purposes than the actual data we want to replicate seems… silly at best.

Log shipping replication it is, at least until someone can come up with a better way to resolve the issues above.