Ayende @ Rahien

Refunds available at head office

Decompression code & Discussion

As I said, a good & small interview question is this one, it is a good one because it is short, relatively simple to handle, but it should show a lot of things about your code. To start with, being faced with a non trivial task that most people are not that familiar with.

Implement a Decompress(Stream input, Stream output, byte[][] dictionary) routine for the following protocol:

Prefix byte – composed of the highest bit + 7 bits len

If the highest bit is not set, this is a uncompressed data, copy the next len from the input to the output.

If the highest bit is set, this is compressed data, the next byte will be the index in the dictionary, copy len bytes from that dictionary entry to the output.

I couldn’t resist doing this myself, and I came up with the following:

public void Decompress(Stream input, Stream output, byte[][] dictionary)
{
    var tmp = new byte[128];
    while (true)
    {
        var readByte = input.ReadByte();
        if (readByte == -1)
            break;
            
        var prefix = (byte) readByte;
        var compressed = (prefix & 0x80) != 0;
        var len = prefix & 0x7f;

        if (compressed == false)
        {
            while (len > 0)
            {
                var read = input.Read(tmp, 0, len);
                if(read == 0)
                    throw new InvalidDataException("Not enough data to read from compressed input stream");
                len -= read;
                output.Write(tmp, 0, read);
            }
        }
        else
        {
            readByte = input.ReadByte();
            if(readByte == -1)
                throw new InvalidDataException("Not enough data to read from compressed input stream");
            output.Write(dictionary[readByte], 0, len);
        }
    }
}

Things to pay attention to: Low memory allocations, error handling, and handling of partial reads from the stream.

But that is just part of the question. After reading the protocol, and implementing it. The question now turns to what does the protocol says about this kind of compression scheme. The use of just 7 bits to store len drastically limit the compression utility in a general format. It also requires an external dictionary, which most compression formats don’t use, they use the actual compressed text itself as the dictionary.  Of course, I’ve been reading compression algorithms for a while now, so that isn’t that fair. But I would expect people to note that that 7 bit limits the compression usability.

And hopefully, with a bit of a hint, they should note that the external dictionary is useful for small data sets where the repetitions are actually between entry, not per entry.

Interview challenges: Decompress that

I’m always looking for additional challenges that I can ask people who interview at Hibernating Rhinos, and I run into an interesting challenge just now.

Implement a Decompress(Stream input, Stream output, byte[][] dictionary) routine for the following protocol:

Prefix byte – composed of the highest bit + 7 bits len

If the highest bit is not set, this is a uncompressed data, copy the next len from the input to the output.

If the highest bit is set, this is compressed data, the next byte will be the index in the dictionary, copy len bytes from that dictionary entry to the output.

After writing the code, the next question is going to be, what are the implication of this protocol? What is it going to be good for? What is it going to be bad for?

The dark sides of Lucene

I’ve been using Lucene for the past six or seven years, and after my last post, I thought it would be a good idea to talk a bit about the kind of things that it isn’t doing well. We’ve been using it extensively in RavenDB for the past 5 years, and I think that I have a pretty good understanding of it. We used to have one of Lucene.NET committers working at Hibernating Rhinos, so I’ve a high level of confidence that I’m not just stupidly not using it properly, too.

Probably the part that caused us the most pain with Lucene was the fact that it isn’t transactional. That is, it is quite easy to get into situations where the indexes are corrupted. That make it… challenging to use it in a database that needs to ensure consistency. The problem is that it is really not a use case that Lucene is well suited for. In order to ensure that data is saved, we have to commit often, the problem is that in order to ensure good performance, we want to commit less often, but then we will the changes if we crash. For that matter, Lucene doesn’t do any attempt to actually flush the data properly, relying on the OS to do that, a system crash can cause you to lose data even though you “committed” it.

Fun times, I can tell you that.

Next, we have the issue of what Lucene call updates. Updates in Lucene are actually just delete/add, and they don’t maintain the same document id (more on that later). Because of that, you usually have to have an additional field in the index that would be your primary key, and you handle updates by first deleting then adding things. That is quite strange, to be fair, and it means that you can’t “extend” an index entry, you have to build it from scratch every time.

Speaking of this, let us talk a bit about deletes. Ignoring for the moment the absolutely horrendous decision to do deletes through the reader, let us talk about how they are actually done. Deletes are recorded in a separate file, and that means that the moment you have any deletes (or, as I mentioned, updates), all the internal statistics are wrong.  We run into this quite often with RavenDB when we are doing things like facets or suggestions. For example, if you have request a suggestion for a user name, it will happily give you suggestions for deletes users, even though we deleted it in Lucene.

It will go away eventually, when it is ready to optimize the index by merging all the files, but in the meantime, it makes  for interesting bug reports.  Speaking of merging, that is another common issue that you have to deal with. In order to ensure optimal performance, you have to be on top of the merge policy. This results in some interesting issues. For RavenDB’s purposes, we do a writer commit after every indexing batch. That means that if you are writing to RavenDB slowly enough, we do a commit after every document write. That result in a lot of segments, and the merge policy would have to do a lot of merges. The problem here is that merges have two distinct costs associated with them.

First, and obviously, you are going to need to write (again) all of the documents in all of the segments you are merging. That is very similar to doing merges in LevelDB ( indeed, in general Lucene’s file format is remarkably similar to SST ). Next, and arguably more interesting / problematic from our point of view is the fact that it also kills all of the caches. Let me try to explain, Lucene uses a lot of caches to speed things up, in fact, most of the sorting is done by using the caches, for example.  That works really well when we are querying normally, because segments are immutable, which makes for great caching. But on a merge, not only have we just invalidate all of our caches, we now need to read, again, all of the data that we just wrote, so we would be able to use it. That can be… costly. And both things can introduce stalls into the system.

The major problem externally with merges is that the document id changes, and that means that you cannot rely on them. It would be much easier if you could send an id out into the world, and get it back later and do something with it, but that isn’t possible with Lucene.

Next, and not really an operational issue like the rest, Lucene’s multi threaded behavior is… a hammer to an egg, in most cases. By that I mean code like this:

image

I mean, it is certainly functional, but it is pretty ugly.

Now, don’t get me wrong, I think that Lucene is pretty neat. But there are some really dark corners there. For example, the actual searching, go ahead and try to find where that is done in Lucene. It is very easy to get lost between all of the different aspects: weights, sorters, queries and various enumerators. For fun, a lot of that runs at hard to figure out times, making the actual query run time interesting to try to figure out.

As a good example, let us take the simplest possible query, TermQuery. Go ahead, try to find where it is actually doing the query for matching terms in this code: https://github.com/apache/lucene/blob/LUCENE_2_1/src/java/org/apache/lucene/search/TermQuery.java

That actually happens here: https://github.com/apache/lucene/blob/LUCENE_2_1/src/java/org/apache/lucene/search/TermScorer.java#L79, and it is effectively a side effect of calling reader.termDocs(term) that limit the matches only to those with the same term.  Trying to track down where exactly things happen can be… interesting.

Anyway, this post is getting to long, and I want to get back to figuring out how Lucene does its thing without dwelling too much in the dark…

Tags:

Published at

Originally posted at

Comments (4)

What Lucene does, a look under the hood

Lucene is a search engine library, which is great. But as it turns out, there is a lot going on there. After working with it for several years, I can say with confidence that it is a pretty awesome library. But surprisingly, a lot of the effort that went into it doesn’t seem to be talked about / visible to people not trolling through the code. I think that this is a pretty good testament for how successful it is. That, and the fact that it is now the base line against which all other search libraries & engines are compared.

What I wanted to talk about today was the kind of things that Lucene is doing that doesn’t seem to get much publicity. I think that Spolsky said it best:

Back to that two page function. Yes, I know, it's just a simple function to display a window, but it has grown little hairs and stuff on it and nobody knows why. Well, I'll tell you why: those are bug fixes. One of them fixes that bug that Nancy had when she tried to install the thing on a computer that didn't have Internet Explorer. Another one fixes that bug that occurs in low memory conditions. Another one fixes that bug that occurred when the file is on a floppy disk and the user yanks out the disk in the middle. That LoadLibrary call is ugly but it makes the code work on old versions of Windows 95.

I remember just how much impact that article made on me at the time. And Lucene’s codebase bear true for this words. Lucene is a search engine library, which basically means that it does:

  • Indexing
  • Querying
  • Maintenance

One of the major areas of maturity in Lucene is how it optimized indexing. You can see it in the code. For example, Lucene goes to a great deal of trouble to avoid allocating memory willy nilly. Instead, pretty much everything there is done via object pools. This helps reduce the memory pressure when doing a lot of indexing and can save a lot of GC cycles.

Another is the concept of multiple threads for indexing .A lot of Lucene is build around this idea, it has a lot of per thread state that is meant to ensure that you don’t have to deal with concurrency yourself. The idea is that you can take an IndexWriter and write to it concurrently, then call commit. A lot of the work to do with indexing is CPU intensive, so that makes a lot of sense, and Lucene nicely isolates you from all of that work. There is DocumentWriterPerThread, so you can see really nice scaling effects as you throw more threads & hardware at the problem.

Usually, when people start messing with Lucene, they do that by writing analyzers, and you are sort of exposed to the memory constraints by being encourage to use ReusableTokenStream, etc. It has also a nice pipeline architecture for doing the indexing work with filters.

On the querying side, Lucene does a lot of work to ensure that things just works. It has a Boolean Model for searches, and Vector Space Model for ranking. Writing your own Query classes is pretty easy too, once you understand how things work, and again, this is another common place for people to extend Lucene. But there is a lot going on behind the scenes. Lucene does a lot of caching on a segment basis, and it is quite nice, since segments are immutable, it means that you can get pretty good usage out of that.

That give it a lot of its speed, and it means that over time, things are actually going to be faster, because more parts of the segments are in memory and cached.

Finally, we have all of the other work that Lucene does. In practice, it means things like merging segments (hopefully in the background), and keeping the overall system humming along. Unfortunately, that is also one of the places that are usually most common for people to start tinkering with when they run into perf problems. That is anything but trivial, and optimizing it is something that require a lot of expertise and understanding about the specific scenario you have.

And on top of that, you have everything else that already works on top of Lucene. Which is quite a lot.

As I said earlier, that is a very impressive piece of technology. That doesn’t mean that it doesn’t have its own set of problems, but that is something that I’ll discuss in detail in my next post.

Tags:

Published at

Originally posted at

Comments (2)

Sorting with Lucene

I talked about the Lucene formula and how it calculate things using tf-idf for best matches. Now I want to talk about the actual sorting implementation. As it turned out, the default sorting (by relevancy) is really simple. All you need is to get the relevant score for a query, then you shove the results through a heap with a specified size. The heap will take care of maintain the top results.

So far, that is pretty simple to understand. But the question is, how do you do sorting on a field value? The answer is, not easily.

image

GetStringIndex() does something very interesting. I returns  a string index, which gives us:

  • A string array containing all the distinct (sorted) value for this index.
  • A int array with all the documents in the index, with the position of the value of that field in the string value array

Now we can compare fields by their field position on the field, which give us pretty good sorting. Unfortunately, this also require us to load all the values to memory. Let us see another example, which would probably be easier to follow:

Sorting by an integer is done like this:

image

Get an array (whose size match the number of documents),  We can then sort things easily because accessing the relevant field value only require us to have the document id to index into the array.

The reason Lucene does this is that it uses an inverted index, and it has no easy way of going from the field values to the list documents it has. So it is easier to read all the values into memory and work with them there. I don’t like it, but off hand, I can’t think of a better way to handle this.

Tags:

Published at

Originally posted at

The Lucene formula: TF & IDF

The basis of how Lucene work is tf–idf, short for term frequency–inverse document frequency. You can read about it here. It is pretty complex, but it is explained in detailed in the Lucene docs. It all fall down to:

image

For now, I want to look at the tf() function. The default implementation is in DefaultSimilarity.Tf, and is defined as:

image

That isn’t really helpful, however. Not without knowing what freq is.  And I’m pretty sure that Sqrt isn’t cheap, which probably explains this:

image

So it caches the score function, and it appears that the tf() is purely based on the count, not on anything else. Before I’ll go and find out what is done with the score cache, let’s look at the weight value. This is calculated here:

image

And idf stands for inverse document frequency.  That is defined by default to be:

image

And the actual implementation as:

image

And the caller for that is:

image

So, we get the document doc frequency of a term, that is, in how many documents this term shows, and the number of all the documents, and that is how we calculate the idf.

So far, so good. But how about that doc frequency? This is one of the things that we store in the terms info file. But how does that get calculated?

I decided to test this with:

image

This should generate 3 terms, two with a frequency of 1 and one with a frequency of 2. Now, to figure out how this is calculated. That part is a real mess.

During indexing, there is a manually maintained hash table that contains information about each unique term, and when we move from one document to another, we write the number of times each term appeared in the document. For fun, this is written to what I think is an in memory buffer for safe keeping, but it is really hard to follow.

Anyway, I now know enough about how that works for simple cases, where everything happens in a single segment. Now let us look what happens when we use multiple segments. It is actually quite trivial. We just need to sum the term frequency each term across all segments. This gets more interesting when we involve deletes. Because of the way Lucene handle deletes, it can’t really handle this scenario, and deleting a document do not remove its frequency counts for the terms that it had. That is the case until the index does a merge, and fix everything that way.

So now I have a pretty good idea about how it handled the overall term frequency. Note that this just gives you the number of times this term has been seen across all documents. What about another important quality, the number of times this term appears in a specific document? Those are stored in the frq file, and they are accessible during queries. This is then used in conjunction with the overall term frequency to generate the boost factor per result.

Tags:

Published at

Originally posted at

Comments (6)

Peeking into Lucene indexing

Continuing my trip into the Lucene codebase, now I’m looking into the process indexing are happening. Interestingly enough, that is something that we never really had to look at before.

It is quite clear that Lucene is heavily meant for utilizing additional threads for improving overall indexing speed. You can see it in the number of per thread state that exist all over the indexing code. That is also meant to reduce memory consumption, as far as I can see.

image

The real stuff happens in the ProcessDocument() where we have a chain of DocFieldConsumerPerThread and TermsHashConsumerPerThread which actually do the work.

Then the real work is happening on a per field level in DocInverterPerField, where the analyzer is actually called. The process in which the analyzers return values for the text is interesting. There are “attributes” that are added to the stream, per token, and they are used to get the relevant values. I assume that this allows to have different levels of analyzers.

image

This way, you can have analyzers that don’t deal with offesets or positions, etc. And the actual processing of this is done in a chain that appears to be:

  • Freq Prox
  • Term Vectors

But that isn’t something that I really care about now. I want to see how Lucene writes the actual terms. This is being written in the TermsInfoWriter, which took some time to find.

image

Terms are stored in a prefix compressed mode (sorted, obviously), and there is the actual terms, and an index into the terms, which allows for faster seeking into the file. This is actually done here:

image

This is a single term written to the file. A lot of the stuff Lucene does (prefixes, VInt, etc) are done in the name of conserving space, and it reminds me greatly of LevelDB’s SST. In fact, the way terms are stored is pretty much an SST, except that this happens to be on multiple files. Pretty much the entire behavior and all the implications are the same.

It also means that searching on this is fast, because the data is sorted, but pretty complex. And it also explains a lot about the actual exposed API that it has. I think that I have a pretty good idea on how things work now. I want to now go back up and look at specific parts of how it works… but that is going to be the next post.

Tags:

Published at

Originally posted at

Comments (4)

The Lucene disk format

I realized lately that I wanted to know a lot more about exactly how Lucene is storing data on disk. Oh, I know the general stuff about segments and files, etc. But I wanted to know the actual bits & bytes. So I started tracing into Lucene and trying to figure out what it is doing.

And, by the way, the only thing that the Lucene.NET codebase is missing is this sign:

image

At any rate, this is how Lucene writes the segment file. Note that this is done in a CRC32 signed file:

image

And the info write method is:

image

Today, I would probably use a JSON file for something like that (bonus point, you know if it is corrupted and it is human readable), but this code was written in 2001, so that explains it.

This is the format of the format of a segment file, and the segments.gen file is generated using:

image

Moving on to actually writing data, I created ten Lucene documents and wrote them. Then just debugged through the code to see what will happen. It started by creating _0.fdx and _0.fdt files. The .fdt is for fields, the fdx is for field indexes.

Both of those files are used when writing the stored fields. This is the empty operation, writing an unstored field.

image

This is how fields are actually stored:

image

And then it ends up in:

image

Note that this particular data goes in the fdt file, while the fdx appears to be a quick way to go from a known document id to the relevant position in the fdx file.

As I was going through the code, I did some searches, and found a very detailed explanation of the actual file format in the docs. That is really nice and quite informative, however, just seeing how the “let us take the documents and make them searchable” part is quite interesting. Lucene has a lot of chains of responsibilities going through. And it is also quite interesting to see the design choices that were made.

Unfortunately, Lucene is very much wedded to its file format, and making changes to it isn’t going to be possible, which is a shame, since it impacts quite a lot of the way Lucene works in general.

Tags:

Published at

Originally posted at

Comments (2)

Raft, as the Raven flies

The interesting thing about the etcd / go-raft review wasn’t so much what they did, they did things quite beautifully, but the things that I wanted them to do that they didn’t do. The actual implementation was fascinating, and for the purposes of what is required, more than adequate. I, however, have a very specific goal in mind, and that means that this is really not going to work with the given implementations.

I’ve better explain first. I’m not sure if go-raft was written by the same people as etcd, but they do “smell” very much the same. It just occurred to me that yes, they are actually both on github, so I checked it appears that I was correct. xiangli-cmu, philips, benbjohnson and bcwaldon (among many others) have worked on both projects. The reason this is important is that this reinforce my feeling that go-raft was written mainly to serve the need of etcd. Anyway…

The reason that I think that is that the implementation is very much optimized at the level etcd needs it. What I mean is that there is a lot of work there to support multiple concurrent operations that would be batched to all the nodes in the cluster, then be applied in the in memory model. This work because etcd is a set of key/value pairs that are shared among many nodes, but they are usually very small values, and very small number of keys. I would be surprised if there were hundreds of thousands of keys in a single etcd installation. That just isn’t the type of thing that it is meant to do. And that reflects throughout.

For example, there is an inherit assumption through the codebase, that the data set is small, the each value is small, etc. When sending a snapshot over the wire, there is the assumption that it can all fit in memory and that this is a good thing to do so. Even more to the point, the system where we only push entries to the nodes on heartbeats is great when you are trying to batch multiple changes together, with each change being independent of all the others (usually). However, that really doesn’t work when what you actually need is something like the sort of things that we usually deal with.

I want to emphasize that I’m really evaluating the way etcd & go-raft are built and find them wanting for my own purposes, I think that they are doing quite great for the kind of problem that they are meant to solve.

Let us consider the kind of databases that we have. We usually have to deal with much larger values, typical documents are in the KB range, and are frequently hundreds of KB in size. We also tend to have quite a lot of them. Trying to put them all in memory (the etcd way) would be… suboptimal. Now, one option would be to try and do just that, and have each operation in RavenDB (put/delete documents) as a state machine command in Raft. The problem is that this really gets complicated very quickly, leaving aside the fact that it isn’t a general solution, and we really want to try to get to a general solution.

We don’t want to solve this once for RavenDB, and then for RavenFS, and then for… etc. And the reason that working at that level is tricky is that you need to push everything through the state machine, and everything in this case really does mean everything. That lead to a very different database design and implementation. It is also probably not really necessary. We can drop this a level or two down and instead of dealing at the database level, we can deal with that at the storage layer level. This is basically just an extension of the log shipping idea. Instead of using Raft for high level commands, just use Raft to create a consensus around the sequence of writes to the journal. That means that all the nodes will have the same journal, and thus have the same data in the storage.

That in turn means that an “entry” can actually be pretty large (mutli MB range, sometimes). And because the journal is purely sequential, we need to issue that command to all the nodes as soon as it is submitted to the Raft state machine.

Now, the reason that the go-raft implementation waits for the heartbeat is that it batches things up nicely, and really speed up the general case. But we don’t need to do that for what we are going to do. Or, to be rather more exact, we have already done just that. That is basically what Voron’s transaction merging is all about. And since we can only handle writes as the leader, and since we will merge concurrent writes anyway, that is going to end up as a very similar behavior under load, and faster without load. At least that is what the initial thinking is saying.

Snapshots, also something quite important for Raft, can be handle very simply by just doing a backup/restore over the network, by streaming all the data.

And the rest is just implementing Raft Smile.

Published at

Originally posted at

Reviewing go-raft, part II

In my previous post, I started to go over the go-raft implementation, after being interrupted by the need to sleep, I decided to go on with this, but I wanted to expand a bit first about the issue we discussed earlier, not checking the number of bytes read in log_entry’s Decode.

image

Let us assume that we actually hit that issue, what would happen?

image

The process goes like this, we try to read a value, but the Read method only return some of the information. We explicitly ignore that, and try to use the buffer anyway. Best case scenario, we are actually getting an error, so we bail early. At that point, we detect the error and truncate the file. Hello data loss, nice to see you. For fun, this is the best case scenario. It is worse if we marshal the partial data without an error. Then we have not the case of “oh, we have a node that is somehow way behind”, we have the case of “this node actually applied different commands than anyone else”.

I reported this issue, and I’m interested to know if my review is in any way correct. With that said, let us move on…

getEntriesAfter gives us all the in memory entries. That is quite similar to how RavenDB handled indexing, for that matter, so it is amusing. But this applies only to in memory stuff, and it is quite interesting to see how this will interact with other parts of the codebase.

setCommitIndex is interesting. In my head, committing something means flushing them to disk. But in Raft’s term. Committing something means applying the commands. But the reason it is interesting is that it has some interesting comments on edge cases. So far, I haven’t see actually writing to disk, mind.

And this one gives me a headache:

image

Basically, this mean that we need to write the commit index to the beginning of the file. It is also an extremely unsafe operation. What happens if you crash immediately after? Did you change go through, or not? For that matter, there is nothing that prevents the OS from first writing the changes you made to the beginning of the file then whatever else you wrote at the end. So a crash might actually leave you with the commit pointer pointing at corrupted data. Luckily, I don’t see anything there actually calling this, though.

The truncate method makes my head ache, mostly because it does things like delete data, which makes my itchy. This is called from the server code as part of normal processing of the append entries request. What this does, in effect, is to say something like: I want you to apply this log entry, make sure that your previous log entry is this, and if it isn’t, revert it back to this entry.  This is how Raft ensure that all the logs are the same across the cluster.

Then we have this:

   1:  // Appends a series of entries to the log.
   2:  func (l *Log) appendEntries(entries []*protobuf.LogEntry) error {
   3:      l.mutex.Lock()
   4:      defer l.mutex.Unlock()
   5:   
   6:      startPosition, _ := l.file.Seek(0, os.SEEK_CUR)
   7:   
   8:      w := bufio.NewWriter(l.file)
   9:   
  10:      var size int64
  11:      var err error
  12:      // Append each entry but exit if we hit an error.
  13:      for i := range entries {
  14:          logEntry := &LogEntry{
  15:              log:      l,
  16:              Position: startPosition,
  17:              pb:       entries[i],
  18:          }
  19:   
  20:          if size, err = l.writeEntry(logEntry, w); err != nil {
  21:              return err
  22:          }
  23:   
  24:          startPosition += size
  25:      }
  26:      w.Flush()
  27:      err = l.sync()
  28:   
  29:      if err != nil {
  30:          panic(err)
  31:      }
  32:   
  33:      return nil
  34:  }

This seems pretty easy to follow, all told. But note the call to sync() there in line 27. And the fact that this translate down to an fsync, which is horrible for performance.

There is also appendEntry, which appears to be doing the exact same thing as appendEntries and writeEntry. I’m guessing that the difference is that appendEntries is called for a follower, and appendEntry is for a leader.

The last thing to go through in the log.go file is the compact function, which is… interesting:

   1:  // compact the log before index (including index)
   2:  func (l *Log) compact(index uint64, term uint64) error {
   3:      var entries []*LogEntry
   4:   
   5:      l.mutex.Lock()
   6:      defer l.mutex.Unlock()
   7:   
   8:      if index == 0 {
   9:          return nil
  10:      }
  11:      // nothing to compaction
  12:      // the index may be greater than the current index if
  13:      // we just recovery from on snapshot
  14:      if index >= l.internalCurrentIndex() {
  15:          entries = make([]*LogEntry, 0)
  16:      } else {
  17:          // get all log entries after index
  18:          entries = l.entries[index-l.startIndex:]
  19:      }
  20:   
  21:      // create a new log file and add all the entries
  22:      new_file_path := l.path + ".new"
  23:      file, err := os.OpenFile(new_file_path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
  24:      if err != nil {
  25:          return err
  26:      }
  27:      for _, entry := range entries {
  28:          position, _ := l.file.Seek(0, os.SEEK_CUR)
  29:          entry.Position = position
  30:   
  31:          if _, err = entry.Encode(file); err != nil {
  32:              file.Close()
  33:              os.Remove(new_file_path)
  34:              return err
  35:          }
  36:      }
  37:      file.Sync()
  38:   
  39:      old_file := l.file
  40:   
  41:      // rename the new log file
  42:      err = os.Rename(new_file_path, l.path)
  43:      if err != nil {
  44:          file.Close()
  45:          os.Remove(new_file_path)
  46:          return err
  47:      }
  48:      l.file = file
  49:   
  50:      // close the old log file
  51:      old_file.Close()
  52:   
  53:      // compaction the in memory log
  54:      l.entries = entries
  55:      l.startIndex = index
  56:      l.startTerm = term
  57:      return nil
  58:  }

This code can’t actually run on Windows. Which is interesting. The issue here is that it is trying to rename a file that is open on top of another file which is open. Windows does not allow it.

But the interesting thing here is what this does. We have the log file, which is the persisted state of the in memory entries collection. Every now and then, we compact it by creating a snapshot, and then we create a new file, with only the entries after the newly created snapshot position.

So far, so good, and that gives me a pretty good feeling regarding how the whole thing is structured. Next in line, the peer.go file. This represent a node’s idea about what is going on in the another node in the cluster. I find the heartbeat code really interesting:

// Starts the peer heartbeat.
func (p *Peer) startHeartbeat() {
    p.stopChan = make(chan bool)
    c := make(chan bool)
    go p.heartbeat(c)
    <-c
}

// Stops the peer heartbeat.
func (p *Peer) stopHeartbeat(flush bool) {
    p.stopChan <- flush
}

// Listens to the heartbeat timeout and flushes an AppendEntries RPC.
func (p *Peer) heartbeat(c chan bool) {
    stopChan := p.stopChan

    c <- true

    ticker := time.Tick(p.heartbeatInterval)

    debugln("peer.heartbeat: ", p.Name, p.heartbeatInterval)

    for {
        select {
        case flush := <-stopChan:
            if flush {
                // before we can safely remove a node
                // we must flush the remove command to the node first
                p.flush()
                debugln("peer.heartbeat.stop.with.flush: ", p.Name)
                return
            } else {
                debugln("peer.heartbeat.stop: ", p.Name)
                return
            }

        case <-ticker:
            start := time.Now()
            p.flush()
            duration := time.Now().Sub(start)
            p.server.DispatchEvent(newEvent(HeartbeatEventType, duration, nil))
        }
    }
}

Start heartbeat starts a new heartbeat, and then wait under the heartbeat function notify it that it has started running.

What is confusing is the reference to the peer’s server. Peer is defined as:

// A peer is a reference to another server involved in the consensus protocol.
type Peer struct {
    server            *server
    Name              string `json:"name"`
    ConnectionString  string `json:"connectionString"`
    prevLogIndex      uint64
    mutex             sync.RWMutex
    stopChan          chan bool
    heartbeatInterval time.Duration
}

And it seems logical to think that this is a remote peer’s server, but this is actually the local server reference, not the remote one. Note that it is actually the flush method that does the remote call.

Flush is defined as:

func (p *Peer) flush() {
    debugln("peer.heartbeat.flush: ", p.Name)
    prevLogIndex := p.getPrevLogIndex()
    term := p.server.currentTerm

    entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest)

    if entries != nil {
        p.sendAppendEntriesRequest(newAppendEntriesRequest(term, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries))
    } else {
        p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.snapshot))
    }
}

The interesting thing here is that the entries collection might be empty (in which case this serve as just a heartbeat). Another thing that pops to mind is that this has an explicitly leader instructing follower to generate snapshots. The Raft paper suggested that this is something that would happen locally on each server on an independent basis.

There is a lot of interesting behavior in sendAppendEntriesRequest(), not so much in what it does, as in how it handles replies. There is a lot of state going on there. It’s very well commented, so I’ll let you read it, there isn’t anything that is actually going on that is complex.

What is fascinating is that while the transport layer for go-raft is HTTP, which is inherently request/response. It actually handles this in an interesting fashion:

  • Requests are synchronous
  • On reply, the in memory state of the peer is updated immediately
  • The response from the peer is queued to be handled by the server event loop

The end result is that a lot of the handling is centralized into a really pretty state machine. The rest of what is going on there is not very interesting, except for snapshots, but those are covered elsewhere.

And now, we are ready to actually go and look at the server code, but… not yet. It is over thousand lines of code, so I think that I’ll go over other stuff first. In particular, snapshotting looks interesting.

image

This is actually quite depressing. Note the State properties here. There is an implicit assumption that it is possible / advisable to go with the entire in memory state like that. I know that I am sensitive to such things, but that seems like an aweful lot of waste when talking about large systems.

Here is one such issue:

image

Let us assume that our state is big, hundreds of MB or maybe a few GB in size.

We currently hold it in memory inside the Snaphsot.STate, then we marshal that to json. Now, I actually had to go and check, but Go’s json package actually does the usual thing and encode a byte array as a base 64 formatted string. What that means, in turn, is that you have an overhead of about 25% that you have to deal with, and this is all allocated in main memory. And then you write it to a file.

This is…. quite insane, to be frank.

Assuming that I have a state that is 100 MB in size, I’m going to hold all of that in memory, then allocate another 125MB just to hold the json state, then write it to a file. Why not write it to a file directly in the first place? (You could do CRC along the way).

The whole thing appear to be assuming small sizes of data. Throughout the entire codebase, actually.

And now, I have no other ways to avoid it, we are going into the server.go itself…

There is a lot of boilerplate stuff there, but the first interesting thing happens when we look at how to apply the log:

image

This says, when we need to apply it, execute the command method on my state. A lot of the other methods are some variant of:

image

Nothing to see here at all.

And we finally get to the key part, the event loop:

image

Let us look in detail on the followerLoop. Inside that function, we have a loop that waits for:

  1. Stop signal, which would lead to us shutting down…
  2. We got an event on our queue…
  3. The timeout for an event has expired…

There is one part there that puzzles me:

image

promotable will return true if the log has any entries at all. I’m not really sure why that is the case, to be honest. In particular, what about the case when we start with an empty server. I’m going to go on reading the code, and we’ll see where it leads us. And it leads us to:

image

So next we need to figure out what is this self join stuff. I am not sure if that is something comes from Raft or from an external source. I found this issue that discusses this, but it isn’t very helpful in terms of understanding who issue the self join command. I tried looking at the etcd codebase, but I didn’t find anything so far. I’ll leave it for now.

The rest of the operations are basically just forwarding the calls to the appropriate methods if they are in the allowed state.

The caniddateLoop method isn’t anything special, it follows the Raft paper pretty nicely, although I have to admit the “candidate becomes follower upon Append Entries command” is buried deep. The same is true for the other behaviors. The appropriate state based responses sometimes are hard to figure out, because you have the state loop, then you have the same apparent behavior everywhere. For example, we need to become follower if we get an Append Entries request. That happens in processAppendEntriesRequest(), but it would actually be easier to see this if we had code duplication. This is a case where getting familiar with the codebase would help understanding it, and I don’t think that this would be a change worth doing, anyway.

Probably the most interesting behavior is in the leadershipLoop when we process a command. A command is added to the server queue using a Do(Command) method. It is then processed in processCommand.

The problem here is that commands are actually appended to the log, and then sent to peers using the heartbeat interval. By default, that stand at 50 ms.

This is great and all, but it does mean that the latency for requests is going to suffer. This doesn’t matter that much for something like etcd. The assumption here is that the requests are all going to be on different things, so we can queue a lot of commands and get pretty good speed overall. It is a problem if in our system, we have to process sequential operations. In that mode, we can’t wait until the heartbeat, and we want to process this right away. I’ll discuss this later,I think. It is a very important property of this implementation (but not for Raft in general).

Looking at the snapshot state, this happens when a follower get this a SnapshotRequest, but I don’t see anywhere that send it. Maybe it is another caller originated thing?

I just looked at the etcd source, and I think that I confirmed that both behaviors are there in the etcd source. So I think that that explains it.

And this is it, basically. I have some thoughts about the implementation of this and of etcd, but I think that this is enough for now… I’ll post them in my next post.

Reviewing go-raft, part I

After going over the etcd codebase, I decided that the raft portion of this is deserving a much stronger look. The project is here, and I am reviewing commit: 30f261bfe873561c2c75b6206ba1f62a42dbc8d6

Again, I strong recommend reading the Raft paper. It is quite good. At any rate, assuming that you understand Raft, let us get cracking. This time, I’m reading this in Sublime Text. As usual, I’m reading in lexicographical order, and I’m starting from append_entires.go

AppendEntries is at the very heart of Raft, so I was pleased to see it here:

// The request sent to a server to append entries to the log.
type AppendEntriesRequest struct {
    Term         uint64
    PrevLogIndex uint64
    PrevLogTerm  uint64
    CommitIndex  uint64
    LeaderName   string
    Entries      []*protobuf.LogEntry
}


// The response returned from a server appending entries to the log.
type AppendEntriesResponse struct {
    pb     *protobuf.AppendEntriesResponse
    peer   string
    append bool
}

However, I didn’t really understand this code. It seemed circular, at least until I realized that we also have a whole lot of generated files. See:

image

The actual protobuf semantics are (excluding a lot of stuff, of course):

message LogEntry {
    required uint64 Index=1;
    required uint64 Term=2;
    required string CommandName=3;
    optional bytes Command=4; // for nop-command
}

message AppendEntriesRequest {
    required uint64 Term=1;
    required uint64 PrevLogIndex=2;
    required uint64 PrevLogTerm=3;
    required uint64 CommitIndex=4;
    required string LeaderName=5;
    repeated LogEntry Entries=6;
}

message AppendEntriesResponse {
    required uint64 Term=1;
    required uint64 Index=2;
    required uint64 CommitIndex=3;
    required bool   Success=4;
}

So, goraft (which I always read as graft) is using protocol buffers as its wire format. Note in particular that the LogEntry contain the full content of a command. That AppendEntriesRequest has an array of them, and that the AppendEntriesResponse is setup separately. That means that it is very natural to use a one way channel for communication. Even though we do request response, there is a high degree of separation between the request & reply. Indeed, from reading the code in etcd, I thought that was the case.

There is something that really bothers me, though. I noticed that in etcd’s codebase as well. This is things like this:

// Encodes the AppendEntriesRequest to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (req *AppendEntriesRequest) Encode(w io.Writer) (int, error) {
    pb := &protobuf.AppendEntriesRequest{
        Term:         proto.Uint64(req.Term),
        PrevLogIndex: proto.Uint64(req.PrevLogIndex),
        PrevLogTerm:  proto.Uint64(req.PrevLogTerm),
        CommitIndex:  proto.Uint64(req.CommitIndex),
        LeaderName:   proto.String(req.LeaderName),
        Entries:      req.Entries,
    }

    p, err := proto.Marshal(pb)
    if err != nil {
        return -1, err
    }

    return w.Write(p)
}

I’m not sure about the actual semantics of memory allocations in Go, but let us assume that we had a single ,log entry with 1KB for the command data. This means that we would have the command data:

  • Once in the LogEntry inside the AppendEntriesRequest
  • Once in the protocol buffers byte array returned from Marshal

There doesn’t appear to be any way to directly stream things. Maybe it is usually dealing with small amounts of data, maybe they didn’t notice, or maybe something in Go make this very efficient, but I doubt it.

The next interesting part is Command handling. Raft is all about reaching a consensus on the order of executing a set of commands in a cluster. So it is really interesting to see it being handled with Go’s interfaces.

// Command represents an action to be taken on the replicated state machine.
type Command interface {
    CommandName() string
}

// CommandApply represents the interface to apply a command to the server.
type CommandApply interface {
    Apply(Context) (interface{}, error)
}

type CommandEncoder interface {
    Encode(w io.Writer) error
    Decode(r io.Reader) error
}

We have some additional things about serializing commands and reading them back, but nothing beyond this. The Commands.go file, however, is of a little bit more interest. Let us look at the join command:

// Join command interface
type JoinCommand interface {
    Command
    NodeName() string
}

// Join command
type DefaultJoinCommand struct {
    Name             string `json:"name"`
    ConnectionString string `json:"connectionString"`
}


// The name of the Join command in the log
func (c *DefaultJoinCommand) CommandName() string {
    return "raft:join"
}

func (c *DefaultJoinCommand) Apply(server Server) (interface{}, error) {
    err := server.AddPeer(c.Name, c.ConnectionString)

    return []byte("join"), err
}

func (c *DefaultJoinCommand) NodeName() string {
    return c.Name
}

I’m not sure when we have an interface for JoinCommand, then a default implementation like that. I saw that elsewhere in etcd, it might be a Go pattern. Note that the JoinCommand is an interface that embeds another interface (Command, in this case, obviously).

Note that you have the Apply function to actually handle the real work, in this case, add a peer.  There is nothing interesting in config.go, debug.go or context.go but event.go is puzzling. To be fair, I am really at a loss to explain this style:

// Event represents an action that occurred within the Raft library.
// Listeners can subscribe to event types by using the Server.AddEventListener() function.
type Event interface {
    Type() string
    Source() interface{}
    Value() interface{}
    PrevValue() interface{}
}

// event is the concrete implementation of the Event interface.
type event struct {
    typ       string
    source    interface{}
    value     interface{}
    prevValue interface{}
}

// newEvent creates a new event.
func newEvent(typ string, value interface{}, prevValue interface{}) *event {
    return &event{
        typ:       typ,
        value:     value,
        prevValue: prevValue,
    }
}

// Type returns the type of event that occurred.
func (e *event) Type() string {
    return e.typ
}

// Source returns the object that dispatched the event.
func (e *event) Source() interface{} {
    return e.source
}

// Value returns the current value associated with the event, if applicable.
func (e *event) Value() interface{} {
    return e.value
}

// PrevValue returns the previous value associated with the event, if applicable.
func (e *event) PrevValue() interface{} {
    return e.prevValue
}

Why go to all this trouble to define things this way? It seems like a lot of boiler plate code. It would be easier to just expose a struct directly. I am assuming that this is done so you can send other things than the event struct, with additional information as well. In C# you’ll do that by subsclassing the event, but you cannot do that in Go. A better alternative might have been to just have a tag / state field in the struct and let it go that way, though.

event_dispatcher.go is just an implementation of a dictionary of string to events, nothing much beyond that. A lot of boiler plate code, too.

http_transporter.go is next, and is a blow to my hope that this will do a one way messaging system. I’m thinking about doing Raft over ZeroMQ or NanoMSG. Here is the actual process of sending data over the wire:

// Sends an AppendEntries RPC to a peer.
func (t *HTTPTransporter) SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
    var b bytes.Buffer
    if _, err := req.Encode(&b); err != nil {
        traceln("transporter.ae.encoding.error:", err)
        return nil
    }

    url := joinPath(peer.ConnectionString, t.AppendEntriesPath())
    traceln(server.Name(), "POST", url)

    t.Transport.ResponseHeaderTimeout = server.ElectionTimeout()
    httpResp, err := t.httpClient.Post(url, "application/protobuf", &b)
    if httpResp == nil || err != nil {
        traceln("transporter.ae.response.error:", err)
        return nil
    }
    defer httpResp.Body.Close()

    resp := &AppendEntriesResponse{}
    if _, err = resp.Decode(httpResp.Body); err != nil && err != io.EOF {
        traceln("transporter.ae.decoding.error:", err)
        return nil
    }

    return resp
}

This is very familiar territory for me, I have to say Smile. Although, again, there is a lot of wasted memory here by encoding the data multiple times, instead of streaming it directly.

And here is how it receives information:

// Handles incoming AppendEntries requests.
func (t *HTTPTransporter) appendEntriesHandler(server Server) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        traceln(server.Name(), "RECV /appendEntries")

        req := &AppendEntriesRequest{}
        if _, err := req.Decode(r.Body); err != nil {
            http.Error(w, "", http.StatusBadRequest)
            return
        }

        resp := server.AppendEntries(req)
        if _, err := resp.Encode(w); err != nil {
            http.Error(w, "", http.StatusInternalServerError)
            return
        }
    }
}

Really there is nothing much to write home about, to be frank. All of the operations are like that, just encoding/decoding and forwarding the code to the right function. I’m skipping log.go in favor of going to log_entry.go for a moment. The log is really important in Raft, so I want to focus on small chewables first.

If the user don’t provide an encoder for a command, it will be converted using json, then serialized to a writer using protocol buffers format.

One thing that I did notice that was interesting was a bug in decoding from a ptorocol buffer stream:

// Decodes the log entry from a buffer. Returns the number of bytes read and
// any error that occurs.
func (e *LogEntry) Decode(r io.Reader) (int, error) {

    var length int
    _, err := fmt.Fscanf(r, "%8x\n", &length)
    if err != nil {
        return -1, err
    }

    data := make([]byte, length)
    _, err = r.Read(data)

    if err != nil {
        return -1, err
    }

    if err = proto.Unmarshal(data, e.pb); err != nil {
        return -1, err
    }

    return length + 8 + 1, nil
}

Do you see the bug?

It is in the reading of the data from the reader. A reader may decide to read less than the data that was requested. In this case, I’m assuming that it is always sending fully materialized readers to the Decode method, not surprising given how often it will create in memory buffers for the entire dataset. Still.. that isn’t nice to do, and it can create the most subtle and hard to understand bugs.

And now, into the Log!

// A log is a collection of log entries that are persisted to durable storage.
type Log struct {
    ApplyFunc   func(*LogEntry, Command) (interface{}, error)
    file        *os.File
    path        string
    entries     []*LogEntry
    commitIndex uint64
    mutex       sync.RWMutex
    startIndex  uint64 // the index before the first entry in the Log entries
    startTerm   uint64
}

// The results of the applying a log entry.
type logResult struct {
    returnValue interface{}
    err         error
}

There are a few things that we can notice right now. First, ApplyFunc is how we control the application of stuff to the in memory state, I am assuming. Given that applying the log can only happen after we have a consensus and probably fsynced to disk, it makes sense to invoke it from here.

Then, we also have a file, so that is where we are actually doing a lot of the interesting stuff, like actual storage IO and things like that. The in memory events array is also interesting, mostly because I wonder just how big it is, and when it is getting truncated. I think that the way it works, we have the log properties, which likely represent the flushed to disk state, and the entries represent the yet to be flushed state.

Things get interesting in the open method, which is called to create new log or recover an existing one. The interesting parts (recovery) is here:

// Read the file and decode entries.
for {
    // Instantiate log entry and decode into it.
    entry, _ := newLogEntry(l, nil, 0, 0, nil)
    entry.Position, _ = l.file.Seek(0, os.SEEK_CUR)

    n, err := entry.Decode(l.file)
    if err != nil {
        if err == io.EOF {
            debugln("open.log.append: finish ")
        } else {
            if err = os.Truncate(path, readBytes); err != nil {
                return fmt.Errorf("raft.Log: Unable to recover: %v", err)
            }
        }
        break
    }
    if entry.Index() > l.startIndex {
        // Append entry.
        l.entries = append(l.entries, entry)
        if entry.Index() <= l.commitIndex {
            command, err := newCommand(entry.CommandName(), entry.Command())
            if err != nil {
                continue
            }
            l.ApplyFunc(entry, command)
        }
        debugln("open.log.append log index ", entry.Index())
    }

    readBytes += int64(n)
}

This is really interesting, because it is actually sending the raw file to the Decode function, unlike what I expected. The reason this is surprising is that there is a strong likelihood that the OS  will actually return less data than requested. As it turns out, on Windows, this will never be the case, but it does appear (at least from the contract of the API it ends up calling) that at least on Linux, that is possible. Now, I ended up going all the way to the sys call interface in linux, so I’m pretty sure that this can’t happen there either, but still…

At any rate, the code appear to be pretty clear. We read the log entry, decode it, (truncating the file if there are any issues) and if need be, we apply it.

And I think that this is enough for now… it is close to 9 PM, and I need to do other things as well. I’ll get back to this in my next post.

Reviewing etcd

The etcd project is a project that I stumbled upon that looks interesting. It is a a highly-available key value store for shared configuration and service discovery. It is written in Go and is implemented using Raft. I’m reviewing commit  46d817f91b2edf4141081abff7d92a4f71d39248.

I don’t know Go, and I think that this would be a great way to learn both about Raft (which I am very interested about) and about Go (which I peeked at occasionally, but never really studied). Like some of my other posts, this is likely to be a very long and rambling one. For reading the code, I am using LiteIDE, at least for now.

This is what this looks like.

image

I usually like to do a lexicographical read through the codebase, at least at first. That means that in this case, I have to go through the docs first. Probably not a totally bad idea, but a divergence from my usual approach.

Discovery – it looks like etcd handled the problem of initial peers selection by… going to another etcd cluster, that handle membership information. There is a SaaS offering, it appears (discovery.etcd.io). I like the recursive nature of that, and obviously you can set it up with a static list of peers to start with.

The first real code file I saw was this one, bench.go:

   1: package main
   2:  
   3: import (
   4:     "flag"
   5:     "log"
   6:     "strconv"
   7:  
   8:     "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
   9: )
  10:  
  11: func write(endpoint string, requests int, end chan int) {
  12:     client := etcd.NewClient([]string{endpoint})
  13:  
  14:     for i := 0; i < requests; i++ {
  15:         key := strconv.Itoa(i)
  16:         _, err := client.Set(key, key, 0)
  17:         if err != nil {
  18:             println(err.Error())
  19:         }
  20:     }
  21:     end <- 1
  22: }
  23:  
  24: func watch(endpoint string, key string) {
  25:     client := etcd.NewClient([]string{endpoint})
  26:  
  27:     receiver := make(chan *etcd.Response)
  28:     go client.Watch(key, 0, true, receiver, nil)
  29:  
  30:     log.Printf("watching: %s", key)
  31:  
  32:     received := 0
  33:     for {
  34:         <-receiver
  35:         received++
  36:     }
  37: }
  38:  
  39: func main() {
  40:     endpoint := flag.String("endpoint", "http://127.0.0.1:4001", "etcd HTTP endpoint")
  41:  
  42:     rWrites := flag.Int("write-requests", 50000, "number of writes")
  43:     cWrites := flag.Int("concurrent-writes", 500, "number of concurrent writes")
  44:  
  45:     watches := flag.Int("watches", 500, "number of writes")
  46:  
  47:     flag.Parse()
  48:  
  49:     for i := 0; i < *watches; i++ {
  50:         key := strconv.Itoa(i)
  51:         go watch(*endpoint, key)
  52:     }
  53:  
  54:     wChan := make(chan int, *cWrites)
  55:     for i := 0; i < *cWrites; i++ {
  56:         go write(*endpoint, (*rWrites / *cWrites), wChan)
  57:     }
  58:  
  59:     for i := 0; i < *cWrites; i++ {
  60:         <-wChan
  61:         log.Printf("Completed %d writes", (*rWrites / *cWrites))
  62:     }
  63: }

I include the entire file here because it is short, and really quite interesting. This is my first time really reading Go code, so I had to go and read some docs. The first interesting thing is in line 11, when when have “end chan int”, which defines a channel of integers. This allows cross goroutine (for .NET guys, think tasks / TPL, that isn’t actually accurate, but it is close enough) communication, including “waiting” for results.

The write func will write the specified number of requests, then push a number to the channel, signifying that it completed its work. That is really quite nice pattern for doing work, considering that there isn’t a way to await a goroutine.

The watch func is a little hard for me to get. Mostly because as far as I understand, it is setting up a watch for a particular key, then just accumulate the number of changes to it in a local variable, and not doing anything else with it.

The main function is really funny to read. The flag package is fascinating way to handle parameter parsing, and it shows how carefully Go was meant to be a server side language, where command line parsing is really common. The flag package is both powerful and really simple. I think that I’ll probably make use of this approach for configuration in RavenDB.

I love that you define the flag, and then you get a pointer to where that value will be, once you called flag.Parse();

Note that calls to go [expr] are equivalent for Task.Factory.StartNew([expr]) in .NET (not really, but close enough). The syntax <-wChan, for example, means “wait until there is a new value in the channel”. Presumably it also translate to something like “await channel.DequeueAsync()” in C#.

Next was the config package, where etcd is initializing itself. It was interesting to learn that you can have non global flags using the flag package, so you can use the same code for parsing arguments to a method. But that was about it. Nothing exciting there.

I’m skipping the contrib directory because there is no Go code there, and it doesn’t seems relevant for now. I’ll note that there is a mix of shell scripts, Python and json code in there, so I’m feeling good about ignoring that for now.

One thing that I really like in Go is that it is very easy to define “extension methods”, in fact, you usually appear to define structs (data holders), and then you just define a function that takes this as the base argument, and you can call it using method syntax. That makes some things very natural and nice. And it also give you really nice separation between data & behavior.

I also like the multiple return values option, which gives us a good pattern for reporting errors without getting either crazy syntax or throwing. It also make it clear when we want to ignore errors. Look at this:

   1: func (d *Discoverer) findPeers() (peers []string, err error) {
   2:     resp, err := d.client.Get(path.Join(d.prefix), false, true)
   3:     if err != nil {
   4:         return nil, err
   5:     }
   6:  
   7:     node := resp.Node
   8:  
   9:     if node == nil {
  10:         return nil, fmt.Errorf("%s key doesn't exist.", d.prefix)
  11:     }

Trying to do this in C, for example, would lead to an explosion of arrow head return values, or the complexities of “return zero for error, then get the actual issue from GetLatsError()”. Multiple return values results in a much nicer code than that.

The discovery protocol itself is defined in the docs ,but the code implementing it is really nice. Being able to piggy back on etcd itself to implement discovery is really nice.

The fixtures directory seems to be filled with certs files, I am not sure what for, but I’ll go directly to the http directory and see what is going on there. And there doesn’t appear to be anything much, so I’m moving on to the next thing that is actually meaningful.

The metrics section is interesting, mostly because we have been doing the same thing in RavenDB currently. But it all depends on an external package, so I’ll skip this for now. The next interesting thing is in the mod folder, where we have the dashboard (html5 app, not interested for me) module, and the leader & lock modules.

I’ll start with the leader module. Where we actually have interesting things. The leader module is actually very literally just proxying stuff to the lock module. It is getting a request, translating that request to a lock module http request, and execute that. Personally, I wouldn’t bother with doing this server side, and handle this entirely client side, or by calling the lock module methods directly, instead of proxying the request to the lock module. I am not sure why this approach was choosen:

   1: // getHandler retrieves the current leader.
   2: func (h *handler) getHandler(w http.ResponseWriter, req *http.Request) error {
   3:     vars := mux.Vars(req)
   4:  
   5:     // Proxy the request to the lock service.
   6:     url := fmt.Sprintf("%s/mod/v2/lock/%s?field=value", h.addr, vars["key"])
   7:     resp, err := h.client.Get(url)
   8:     if err != nil {
   9:         return err
  10:     }
  11:     defer resp.Body.Close()
  12:  
  13:     w.WriteHeader(resp.StatusCode)
  14:     io.Copy(w, resp.Body)
  15:     return nil
  16: }

The lock module is where real stuff is happening. And at the same time, I am not sure at what level exactly this is happening. What appears to be happening is that the lock module, too, is built directly on top of the etcd client, rather than using it directly. This is strange to me, because that isn’t the way I would architect it, but I am guessing that this make it easier to work with things, having only a single real external API. On the other hand, having a server make http requests to itself seems very strange to me.

One thing that really confused me was a lot of references to things that are actually defined in another repository, the client side of etcd in go. Another interesting thing is the way Go implements interfaces. Instead of using explicit interfaces, if a type has all the methods for an interface, it is implementing that interface.

At this point I decided that I wanted a better IDE and spent some time getting IntelliJ to work with Go. It supports this, and you even get some reference tracking. I couldn’t get all of it to work, in particular, external reference weren’t tracked, and I didn’t really care to see why, so I just left it:

image

At any rate, I was reading the lock module code. In particular, I am not tracking the acquire_handler.go file. It has a major function (acquireHandler)* that actually handle the process of acquiring the lock.

* Sidenote, I like the structure of the code so far, in most files, we have one function, and some supporting functions to help it do some work. It is nice, simple and quite easy to follow.

The first thing that is done is syncing the cluster information. This is done by going to any of the machines that we already know about and asking them about the current state of the cluster. We take the first response, and presumably, since we are running server side, the first response would always be from us (assuming that the requests end up going to the leader). So there isn’t another machine boundary request, but it is still very strange to read it going through so much client operations.

This code is really interesting:

   1:  
   2:     // Setup connection watcher.
   3:     closeNotifier, _ := w.(http.CloseNotifier)
   4:     closeChan := closeNotifier.CloseNotify()
   5:     stopChan := make(chan bool)

In C#, that would be setting up a cancellation token for the request being abandoned by the client or we completing some work.

   1: // If node exists then just watch it. Otherwise create the node and watch it.           
   2: node, index, pos := h.findExistingNode(keypath, value)                                  
   3: if index > 0 {                                                                          
   4:     if pos == 0 {                                                                          
   5:         // If lock is already acquired then update the TTL.                                   
   6:         h.client.Update(node.Key, node.Value, uint64(ttl))                                    
   7:     } else {                                                                               
   8:         // Otherwise watch until it becomes acquired (or errors).                             
   9:         err = h.watch(keypath, index, nil)                                                    
  10:     }                                                                                      
  11: } else {                                                                                
  12:     index, err = h.createNode(keypath, value, ttl, closeChan, stopChan)                    
  13: }                                                                                       

This is interesting, I am not really able to follow what is going on in the happen case (index > 0) yet. Let us lock at what happens with createNode…

   1: // createNode creates a new lock node and watches it until it is acquired or acquisition fails.
   2: func (h *handler) createNode(keypath string, value string, ttl int, closeChan <-chan bool, stopChan chan bool) (int, error) {
   3:     // Default the value to "-" if it is blank.
   4:     if len(value) == 0 {
   5:         value = "-"
   6:     }
   7:  
   8:     // Create an incrementing id for the lock.
   9:     resp, err := h.client.AddChild(keypath, value, uint64(ttl))
  10:     if err != nil {
  11:         return 0, err
  12:     }
  13:     indexpath := resp.Node.Key
  14:     index, _ := strconv.Atoi(path.Base(indexpath))
  15:  
  16:     // Keep updating TTL to make sure lock request is not expired before acquisition.
  17:     go h.ttlKeepAlive(indexpath, value, ttl, stopChan)
  18:  
  19:     // Watch until we acquire or fail.
  20:     err = h.watch(keypath, index, closeChan)
  21:  
  22:     // Check for connection disconnect before we write the lock index.
  23:     if err != nil {
  24:         select {
  25:         case <-closeChan:
  26:             err = errors.New("user interrupted")
  27:         default:
  28:         }
  29:     }
  30:  
  31:     // Update TTL one last time if acquired. Otherwise delete.
  32:     if err == nil {
  33:         h.client.Update(indexpath, value, uint64(ttl))
  34:     } else {
  35:         h.client.Delete(indexpath, false)
  36:     }
  37:  
  38:     return index, err
  39: }

In line 9, we create a child of the key path. Assuming that the key path is foo, this will create an item with foo/1, foo/2, etc. Effectively an auto incrementing number (with no guarantees on the size of the step, mind).

In line 17 we make sure that we keep this alive, the ttlKeepAlive function is really fun to read:

   1: // ttlKeepAlive continues to update a key's TTL until the stop channel is closed.
   2: func (h *handler) ttlKeepAlive(k string, value string, ttl int, stopChan chan bool) {
   3:     for {
   4:         select {
   5:         case <-time.After(time.Duration(ttl/2) * time.Second):
   6:             h.client.Update(k, value, uint64(ttl))
   7:         case <-stopChan:
   8:             return
   9:         }
  10:     }
  11: }

The C# translation for this would be:

   1: public async Task TtlKeepAlive(string k, string value, int ttl, CancelationToken t)
   2: {
   3:     while(true)
   4:     {
   5:         await Task.Delay(ttl, t);
   6:         if(t.IsCancelled)
   7:           return;
   8:         client.Update(k,value, ttl);
   9:  
  10:     }
  11: }

But I really like this Go select syntax. It is very much like Erlang’s pattern matching on receive. At any rate, it seems that the magic happens in the watch method.

   1: // watch continuously waits for a given lock index to be acquired or until lock fails.
   2: // Returns a boolean indicating success.
   3: func (h *handler) watch(keypath string, index int, closeChan <-chan bool) error {
   4:     // Wrap close chan so we can pass it to Client.Watch().
   5:     stopWatchChan := make(chan bool)
   6:     stopWrapChan := make(chan bool)
   7:     go func() {
   8:         select {
   9:         case <-closeChan:
  10:             stopWatchChan <- true
  11:         case <- stopWrapChan:
  12:             stopWatchChan <- true
  13:         case <- stopWatchChan:
  14:         }
  15:     }()
  16:     defer close(stopWrapChan)
  17:  
  18:     for {
  19:         // Read all nodes for the lock.
  20:         resp, err := h.client.Get(keypath, true, true)
  21:         if err != nil {
  22:             return fmt.Errorf("lock watch lookup error: %s", err.Error())
  23:         }
  24:         nodes := lockNodes{resp.Node.Nodes}
  25:         prevIndex := nodes.PrevIndex(index)
  26:  
  27:         // If there is no previous index then we have the lock.
  28:         if prevIndex == 0 {
  29:             return nil
  30:         }
  31:  
  32:         // Watch previous index until it's gone.
  33:         waitIndex := resp.Node.ModifiedIndex
  34:  
  35:         // Since event store has only 1000 histories we should use first node's CreatedIndex if available
  36:         if firstNode := nodes.First(); firstNode != nil {
  37:             waitIndex = firstNode.CreatedIndex
  38:         }
  39:  
  40:         _, err = h.client.Watch(path.Join(keypath, strconv.Itoa(prevIndex)), waitIndex, false, nil, stopWatchChan)
  41:         if err == etcd.ErrWatchStoppedByUser {
  42:             return fmt.Errorf("lock watch closed")
  43:         } else if err != nil {
  44:             return fmt.Errorf("lock watch error: %s", err.Error())
  45:         }
  46:     }
  47: }

I’ve to admit, this makes my head hurt, just a little bit.

But first, the defer syntax Go has is really nice. It is very similar to C#’s using statements, but it isn’t limited to just a specific interface, and it doesn’t introduce a nesting block.

To go routine in line 7 is interesting. It will wait for a notification from the close channel, the stop watch channel or the wrap channel. And it will forward all of those to the stop watch channel. I’m really not sure why this is the case, but let’s go with this for now.

The real interesting work happens in the for loop. We get all the keys in the specified key path. Note that we assume that we only have numeric keys in that “directory”. And we basically try to find if there is any value that is before our value.

The easiest way to think about it is in the same way you do when you wait in line in any government queue. You take a number, and you’re the first is there is no one with an earlier number than you.

The interesting bit is how Watch is handled. It is basically going to do a long poll request from the server, and the stopWatchChan is used to notify the Watch method when the user cancelled the request, so we don’t need this any longer. I’m really not sure why there is a need for stopWrapChan, but… at least now I understand what is going on here. We use the numbering system to effectively join a queue. Then we wait until we are at the head of the queue.

Let us go back to the actual acuireHandler routine, more specifically to the findExistingNode() behavior. If we specified a value, we try to find an existing entry in the path that already have this value. If there isn’t a value, we go back to the “take a number, wait” approach. If there is a value, however, I don’t following the logic. The findExistingNode() has three return values. The relevant node with the value, the index (the queue #, effectively), and the position of the specified node in the queue.

My problem is that I don’t understand the logic here. We find a node with the same value as we want, then we check that it is the first in the queue? What happens when we have two clients issuing the same request at the same time? I understand what happens, I don’t understand what the intention is.

As an aside, I think that I understand why a lot of the internal works in the lock module is done over the HTTP layer. The idea here is to only handle the distribution once. And if you route everything through the http interface, that would be it. This way, you don’t have to worry about how to handle consistencies, or stuff like that. And the idea is that you have a simple HTTP interface for a complex system like locking. My own preference would be to do this entirely client side, with no server side behavior, but that puts a lot of the onus on the clients, and it is easier to implement server side if you have a lot of clients for many environments.

Anyway, I think that I found the two pieces that really interests me:

imageimage

 

Store is probably the on disk storage, something that is very near & dear to my heart. While server is the pieces that I’ll probably learn the most from…

I’ll start with going over the storage stuff, since that is probably the most familiar to me. Here is the interface for the store:

   1: type Store interface {
   2:     Version() int
   3:     CommandFactory() CommandFactory
   4:     Index() uint64
   5:  
   6:     Get(nodePath string, recursive, sorted bool) (*Event, error)
   7:     Set(nodePath string, dir bool, value string, expireTime time.Time) (*Event, error)
   8:     Update(nodePath string, newValue string, expireTime time.Time) (*Event, error)
   9:     Create(nodePath string, dir bool, value string, unique bool,
  10:         expireTime time.Time) (*Event, error)
  11:     CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
  12:         value string, expireTime time.Time) (*Event, error)
  13:     Delete(nodePath string, recursive, dir bool) (*Event, error)
  14:     CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error)
  15:  
  16:     Watch(prefix string, recursive, stream bool, sinceIndex uint64) (*Watcher, error)
  17:  
  18:     Save() ([]byte, error)
  19:     Recovery(state []byte) error
  20:  
  21:     TotalTransactions() uint64
  22:     JsonStats() []byte
  23:     DeleteExpiredKeys(cutoff time.Time)
  24: }

This is really interesting, because from the interface alone you can see some really interesting things. For example, The Save() method. That doesn’t match any transactional store interface that I can think of. To be fair, it looks to be more in the sense that this is used for snapshots than anything else, but still..

Those appear to be the core elements of the store:

   1: type store struct {                                                     
   2:     Root           *node                                                   
   3:     WatcherHub     *watcherHub                                             
   4:     CurrentIndex   uint64                                                  
   5:     Stats          *Stats                                                  
   6:     CurrentVersion int                                                     
   7:     ttlKeyHeap     *ttlKeyHeap  // need to recovery manually               
   8:     worldLock      sync.RWMutex // stop the world lock                     
   9: }                                                                       
  10:  
  11:  
  12: // node is the basic element in the store system.
  13: // A key-value pair will have a string value
  14: // A directory will have a children map
  15: type node struct {
  16:     Path string
  17:  
  18:     CreatedIndex  uint64
  19:     ModifiedIndex uint64
  20:  
  21:     Parent *node `json:"-"` // should not encode this field! avoid circular dependency.
  22:  
  23:     ExpireTime time.Time
  24:     ACL        string
  25:     Value      string           // for key-value pair
  26:     Children   map[string]*node // for directory
  27:  
  28:     // A reference to the store this node is attached to.
  29:     store *store
  30: }

Again, the ability to return multiple values is really nice, see methods such as:

   1: // Read function gets the value of the node.
   2: // If the receiver node is not a key-value pair, a "Not A File" error will be returned.
   3: func (n *node) Read() (string, *etcdErr.Error) {
   4:     if n.IsDir() {
   5:         return "", etcdErr.NewError(etcdErr.EcodeNotFile, "", n.store.Index())
   6:     }
   7:  
   8:     return n.Value, nil
   9: }

This is quite lovely way to handle things.

However, looking at the Store directory, I am seeing a lot of stuff about modifying the in memory state, but nothing about persistence. I think that this is all handled via Raft. So I’ll just move away into reading the server side code now.

This is the Server interface:

   1: type Server interface {
   2:     State() string
   3:     Leader() string
   4:     CommitIndex() uint64
   5:     Term() uint64
   6:     PeerURL(string) (string, bool)
   7:     ClientURL(string) (string, bool)
   8:     Store() store.Store
   9:     Dispatch(raft.Command, http.ResponseWriter, *http.Request) error
  10: }

And then we have actually handling requests. I choose to look at the simplest thing, just looking at how we process a read request:

   1: func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
   2:     vars := mux.Vars(req)
   3:     key := "/" + vars["key"]
   4:  
   5:     // Help client to redirect the request to the current leader
   6:     if req.FormValue("consistent") == "true" && s.State() != raft.Leader {
   7:         leader := s.Leader()
   8:         hostname, _ := s.ClientURL(leader)
   9:  
  10:         url, err := url.Parse(hostname)
  11:         if err != nil {
  12:             log.Warn("Redirect cannot parse hostName ", hostname)
  13:             return err
  14:         }
  15:         url.RawQuery = req.URL.RawQuery
  16:         url.Path = req.URL.Path
  17:  
  18:         log.Debugf("Redirect consistent get to %s", url.String())
  19:         http.Redirect(w, req, url.String(), http.StatusTemporaryRedirect)
  20:         return nil
  21:     }
  22:  
  23:     recursive := (req.FormValue("recursive") == "true")
  24:     sort := (req.FormValue("sorted") == "true")
  25:     waitIndex := req.FormValue("waitIndex")
  26:     stream := (req.FormValue("stream") == "true")
  27:  
  28:     if req.FormValue("wait") == "true" {
  29:         return handleWatch(key, recursive, stream, waitIndex, w, s)
  30:     }
  31:  
  32:     return handleGet(key, recursive, sort, w, s)
  33: }

If we are requiring consistency, and we aren’t the leader, we’ll forward to the leader. Otherwise, we’ll process the request. Let us assume for now that we are doing a simple get, not a watch, this gives us:

   1: func handleGet(key string, recursive, sort bool, w http.ResponseWriter, s Server) error {
   2:     event, err := s.Store().Get(key, recursive, sort)
   3:     if err != nil {
   4:         return err
   5:     }
   6:  
   7:     writeHeaders(w, s)
   8:     b, _ := json.Marshal(event)
   9:     w.Write(b)
  10:     return nil
  11: }
  12:  
  13: func writeHeaders(w http.ResponseWriter, s Server) {
  14:     w.Header().Set("Content-Type", "application/json")
  15:     w.Header().Add("X-Etcd-Index", fmt.Sprint(s.Store().Index()))
  16:     w.Header().Add("X-Raft-Index", fmt.Sprint(s.CommitIndex()))
  17:     w.Header().Add("X-Raft-Term", fmt.Sprint(s.Term()))
  18:     w.WriteHeader(http.StatusOK)
  19: }

As you can see, we are basically just getting the current state from the in memory store, and handle that. It would be more interesting to look at how we handle waiting for a value to change, however:

   1: func handleWatch(key string, recursive, stream bool, waitIndex string, w http.ResponseWriter, s Server) error {
   2:     // Create a command to watch from a given index (default 0).
   3:     var sinceIndex uint64 = 0
   4:     var err error
   5:  
   6:     if waitIndex != "" {
   7:         sinceIndex, err = strconv.ParseUint(waitIndex, 10, 64)
   8:         if err != nil {
   9:             return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", s.Store().Index())
  10:         }
  11:     }
  12:  
  13:     watcher, err := s.Store().Watch(key, recursive, stream, sinceIndex)
  14:     if err != nil {
  15:         return err
  16:     }
  17:  
  18:     cn, _ := w.(http.CloseNotifier)
  19:     closeChan := cn.CloseNotify()
  20:  
  21:     writeHeaders(w, s)
  22:  
  23:     if stream {
  24:         // watcher hub will not help to remove stream watcher
  25:         // so we need to remove here
  26:         defer watcher.Remove()
  27:         for {
  28:             select {
  29:             case <-closeChan:
  30:                 return nil
  31:             case event, ok := <-watcher.EventChan:
  32:                 if !ok {
  33:                     // If the channel is closed this may be an indication of
  34:                     // that notifications are much more than we are able to
  35:                     // send to the client in time. Then we simply end streaming.
  36:                     return nil
  37:                 }
  38:  
  39:                 b, _ := json.Marshal(event)
  40:                 _, err := w.Write(b)
  41:                 if err != nil {
  42:                     return nil
  43:                 }
  44:                 w.(http.Flusher).Flush()
  45:             }
  46:         }
  47:     }
  48:  
  49:     select {
  50:     case <-closeChan:
  51:         watcher.Remove()
  52:     case event := <-watcher.EventChan:
  53:         b, _ := json.Marshal(event)
  54:         w.Write(b)
  55:     }
  56:     return nil
  57: }

The store’s Watch() method is actually interesting, because it exposes some interesting Go concepts (handling full channels, channels for communications, etc). But the important thing is that this will simply wait for a change to happen in the in memory state, and when such a thing happens, it will put a value in the wathcer.EventChan channel. So the logic here goes like this:

  • Setup a watch on the in memory state.
  • Wait for a:
    • Change in the items we watch
    • Or user abandoning the request

There is some interesting stuff here regarding one time watch, or streaming watch, but that appears to be quite easy to figure out and follow what is going on.

One thing that I can tell you, from my own experience, is that I would actually expect this to have serious issues in productions. In particular, web servers can decide that this request takes too long, and just abort it (for example IIS behaves in this manner), or that it timed out. That obviously depends on the server side implementation, and I’m willing to assume that this isn’t the case for whatever http stack etcd uses. However, clients do not. Most clients would give you the benefit of the doubt, but they would abort the request after a while, usually 15 seconds. That might be okay for some purposes, but especially if you want to handling streaming, that isn’t really going to cut it.

More to the point, for long requests, this can cause issues for proxies, firewalls, etc. They’ll decide that the request is closed, and shut it down even if you handled it on both ends properly. With RavenDB, we have a remarkably similar system, but our streaming notifications also incorporate the idea of heartbeat messages. Those are sent every now and then strictly in order to make sure that you’ll get something client side, and that will make all the infrastructure, client side code, etc much much happier.

Enough with the small stuff, let us look at how we handle more complex things. I now intend to take a look at the POST handler. POST operations in etcd has the following format:

curl http://127.0.0.1:4001/v2/keys/queue -XPOST -d value=Job1

The idea is that this will create an automatically named key such as queue/15, queue/853, etc. The POST handler is interesting, because here it is in its entirety.

   1:  
   2: func PostHandler(w http.ResponseWriter, req *http.Request, s Server) error {
   3:     vars := mux.Vars(req)
   4:     key := "/" + vars["key"]
   5:  
   6:     value := req.FormValue("value")
   7:     dir := (req.FormValue("dir") == "true")
   8:     expireTime, err := store.TTL(req.FormValue("ttl"))
   9:     if err != nil {
  10:         return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", s.Store().Index())
  11:     }
  12:  
  13:     c := s.Store().CommandFactory().CreateCreateCommand(key, dir, value, expireTime, true)
  14:     return s.Dispatch(c, w, req)
  15: }

The CreateCreateCommand just create a data object that holds the parameters, and then we dispatch it. I’m thinking that we can learn quite a lot from how that works.

Dispatch merely send the command to the leader. This is the relevant code if we are not the leader:

   1: leader := ps.raftServer.Leader()                                         
   2:                                                                          
   3: // No leader available.                                                  
   4: if leader == "" {                                                        
   5:     return etcdErr.NewError(300, "", s.Store().Index())                     
   6: }                                                                        
   7:                                                                          
   8: var url string                                                           
   9: switch c.(type) {                                                        
  10: case *JoinCommand, *RemoveCommand:                                       
  11:     url, _ = ps.registry.PeerURL(leader)                                    
  12: default:                                                                 
  13:     url, _ = ps.registry.ClientURL(leader)                                  
  14: }                                                                        
  15: uhttp.Redirect(url, w, req)                                              
  16:                                                                          
  17: return nil                                                               

So basically, if there isn’t a leader, we error. That can happen if we have a network split and we are in the minority portion, for example. But usually we’ll just redirect you to the right server to use. But here is the interesting part, where we are the leader, and get to do stuff:

   1: result, err := ps.raftServer.Do(c)
   2: if err != nil {
   3:     return err
   4: }
   5:  
   6: if result == nil {
   7:     return etcdErr.NewError(300, "Empty result from raft", s.Store().Index())
   8: }
   9:  
  10: // response for raft related commands[join/remove]
  11: if b, ok := result.([]byte); ok {
  12:     w.WriteHeader(http.StatusOK)
  13:     w.Write(b)
  14:     return nil
  15: }
  16:  
  17: var b []byte
  18: if strings.HasPrefix(req.URL.Path, "/v1") {
  19:     b, _ = json.Marshal(result.(*store.Event).Response(0))
  20:     w.WriteHeader(http.StatusOK)
  21: } else {
  22:     e, _ := result.(*store.Event)
  23:     b, _ = json.Marshal(e)
  24:  
  25:     w.Header().Set("Content-Type", "application/json")
  26:     // etcd index should be the same as the event index
  27:     // which is also the last modified index of the node
  28:     w.Header().Add("X-Etcd-Index", fmt.Sprint(e.Index()))
  29:     w.Header().Add("X-Raft-Index", fmt.Sprint(s.CommitIndex()))
  30:     w.Header().Add("X-Raft-Term", fmt.Sprint(s.Term()))
  31:  
  32:     if e.IsCreated() {
  33:         w.WriteHeader(http.StatusCreated)
  34:     } else {
  35:         w.WriteHeader(http.StatusOK)
  36:     }
  37: }
  38:  
  39: w.Write(b)
  40:  
  41: return nil

The ps variable is something called a PeerServer, and I haven’t check it yet. But all of this code is basically doing is: “send this to raft to Do something about it, then reply to the caller”. So let us look at what we are actually doing there. The Do method merely call the send() method, which looks like this:

   1: // Sends an event to the event loop to be processed. The function will wait
   2: // until the event is actually processed before returning.
   3: func (s *server) send(value interface{}) (interface{}, error) {
   4:     event := &ev{target: value, c: make(chan error, 1)}
   5:     s.c <- event
   6:     err := <-event.c
   7:     return event.returnValue, err
   8: }

Personally, I think that this is very interesting, and again, very much like the way you would structure an Erlang system. Of particular interest is the idea of event loop. That would be the s.c channel, and I assume that this is meant for a separate goroutine that is processing work on top of that. We ended up with transaction merging in Voron using pretty much the same system.

The s.c channel is a channel of ev pointers. And ev is defined as:

   1:  
   2: // An internal event to be processed by the server's event loop.
   3: type ev struct {
   4:     target      interface{}
   5:     returnValue interface{}
   6:     c           chan error
   7: }

The interface{} definition it Go’s System.Object, basically. And the error channel is there to mark when we are done processing the event, I assume. I would structure it so we can send a null error as completion, and I bet that this is how this is done.

I’m currently assuming that this is being read from the loop() method. And while I usually don’t just comment on comments, this one is really nice:

   1:  
   2: //--------------------------------------
   3: // Event Loop
   4: //--------------------------------------
   5:  
   6: //               ________
   7: //            --|Snapshot|                 timeout
   8: //            |  --------                  ______
   9: // recover    |       ^                   |      |
  10: // snapshot / |       |snapshot           |      |
  11: // higher     |       |                   v      |     recv majority votes
  12: // term       |    --------    timeout    -----------                        -----------
  13: //            |-> |Follower| ----------> | Candidate |--------------------> |  Leader   |
  14: //                 --------               -----------                        -----------
  15: //                    ^          higher term/ |                         higher term |
  16: //                    |            new leader |                                     |
  17: //                    |_______________________|____________________________________ |

This comment promises an interesting function to read…

   1:  
   2: func (s *server) loop() {
   3:     defer s.debugln("server.loop.end")
   4:  
   5:     for {
   6:         state := s.State()
   7:  
   8:         s.debugln("server.loop.run ", state)
   9:         switch state {
  10:         case Follower:
  11:             s.followerLoop()
  12:  
  13:         case Candidate:
  14:             s.candidateLoop()
  15:  
  16:         case Leader:
  17:             s.leaderLoop()
  18:  
  19:         case Snapshotting:
  20:             s.snapshotLoop()
  21:  
  22:         case Stopped:
  23:             s.stopped <- true
  24:             return
  25:         }
  26:     }
  27: }

Let us start by looking at the leaderLoop behavior:

   1: func (s *server) leaderLoop() {
   2:     s.setState(Leader)
   3:     logIndex, _ := s.log.lastInfo()
   4:  
   5:     // Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat.
   6:     s.debugln("leaderLoop.set.PrevIndex to ", logIndex)
   7:     for _, peer := range s.peers {
   8:         peer.setPrevLogIndex(logIndex)
   9:         peer.startHeartbeat()
  10:     }
  11:  
  12:     // Commit a NOP after the server becomes leader. From the Raft paper:
  13:     // "Upon election: send initial empty AppendEntries RPCs (heartbeat) to
  14:     // each server; repeat during idle periods to prevent election timeouts
  15:     // (§5.2)". The heartbeats started above do the "idle" period work.
  16:     go s.Do(NOPCommand{})
  17:  
  18:     // Begin to collect response from followers
  19:     for s.State() == Leader {
  20:         var err error
  21:         select {
  22:         case e := <-s.c:
  23:             if e.target == &stopValue {
  24:                 // Stop all peers before stop
  25:                 for _, peer := range s.peers {
  26:                     peer.stopHeartbeat(false)
  27:                 }
  28:                 s.setState(Stopped)
  29:             } else {
  30:                 switch req := e.target.(type) {
  31:                 case Command:
  32:                     s.processCommand(req, e)
  33:                     continue
  34:                 case *AppendEntriesRequest:
  35:                     e.returnValue, _ = s.processAppendEntriesRequest(req)
  36:                 case *AppendEntriesResponse:
  37:                     s.processAppendEntriesResponse(req)
  38:                 case *RequestVoteRequest:
  39:                     e.returnValue, _ = s.processRequestVoteRequest(req)
  40:                 }
  41:             }
  42:  
  43:             // Callback to event.
  44:             e.c <- err
  45:         }
  46:     }
  47:  
  48:     s.syncedPeer = nil
  49: }

To be perfectly frank, this is really code code. I am loving the structure here. It is really fun to go through and figure out. And the code follows really closely the Raft paper. And… it appears that at some point I actually moved off the etcd code and into the go-raft codebase. I think that I’ll skip doing the Raft stuff for this blog post. It is long enough already, and just focus on the etcd stuff for now.

The part that we really care for this blog post about is the processCommand call:

   1: // Processes a command.
   2: func (s *server) processCommand(command Command, e *ev) {
   3:     s.debugln("server.command.process")
   4:  
   5:     // Create an entry for the command in the log.
   6:     entry, err := s.log.createEntry(s.currentTerm, command, e)
   7:  
   8:     if err != nil {
   9:         s.debugln("server.command.log.entry.error:", err)
  10:         e.c <- err
  11:         return
  12:     }
  13:  
  14:     if err := s.log.appendEntry(entry); err != nil {
  15:         s.debugln("server.command.log.error:", err)
  16:         e.c <- err
  17:         return
  18:     }
  19:  
  20:     s.syncedPeer[s.Name()] = true
  21:     if len(s.peers) == 0 {
  22:         commitIndex := s.log.currentIndex()
  23:         s.log.setCommitIndex(commitIndex)
  24:         s.debugln("commit index ", commitIndex)
  25:     }
  26: }

In createEntry, we create effectively serialize the command into JSON, and appenEntry writes it to a file. (I finally found the serialize format, it is JSON for the commands, wrapped in a protobuf envelop). As an aside, if this was a C# code, I would be very worried about the cost of all those allocations. The data is first moved to a JSON buffer, then into a protocol buffer entry, where it is marshaled into another buffer, and only then is it written to the actual file. That is pretty prevalent in the codebase, to be honest. But again, this is Raft stuff that is going on, not etcd stuff. So we’ll ignore this for now and try to see where we actually get to apply the command against our own internal state.

I had to go through some hoops to figure it out. In particular, commands are applied during recovery, or when we are actually committing the state following a quorum, and this is happening in the Log.ApplyFunc, which is setup externally, and… Anyway, what we actually do is this:

   1: // Apply command to the state machine.                                 
   2: switch c := c.(type) {                                                 
   3: case CommandApply:                                                     
   4:     return c.Apply(&context{                                              
   5:         server:       s,                                                     
   6:         currentTerm:  s.currentTerm,                                         
   7:         currentIndex: s.log.internalCurrentIndex(),                          
   8:         commitIndex:  s.log.commitIndex,                                     
   9:     })                                                                    
  10: case deprecatedCommandApply:                                           
  11:     return c.Apply(s)                                                     
  12: default:                                                               
  13:     return nil, fmt.Errorf("Command does not implement Apply()")          
  14: }                                                                      

And that goes all the way back to the CreateCommand’s Apply function, which does:

   1: // Create node                                                                                      
   2: func (c *CreateCommand) Apply(context raft.Context) (interface{}, error) {                          
   3:     s, _ := context.Server().StateMachine().(store.Store)                                              
   4:                                                                                                     
   5:     e, err := s.Create(c.Key, c.Dir, c.Value, c.Unique, c.ExpireTime)                                  
   6:                                                                                                     
   7:     if err != nil {                                                                                    
   8:         log.Debug(err)                                                                                    
   9:         return nil, err                                                                                   
  10:     }                                                                                                  
  11:                                                                                                     
  12:     return e, nil                                                                                      
  13: }                                                                                                   
  14:                                                                                                     

So, basically, we have Raft that does the hard work of getting a Quorum, persistence, etc.  The etcd server is responsible for the in memory state, defining commands, etc.

The really interesting part from my perspective is that we need to process erroneous entries as well, in the same manner. For example, let us say that I want to create a new entry, but only if it isn’t already there. The way it works, even though I know that this would be an error, I have to run this through Raft, get a consensus that we can apply this command, and then we apply the command, see that it is wrong, and return an error. That error leaves no state changes, but it still had to go through the Raft process, it is going to be in the log forever, etc. I’m guessing that the percentage of erroneous commands is low, to be able to tolerate that.

And, at any rate. That pretty much conclude my review of etcd. It comes to about 20 pages or so, according to my math, and that is quite enough. On the other hand, it might have been 7 posts, instead. I would really like to get some feedback on which option you like more, dear reader.

Next, I’m going to go over go-raft, I have some thoughts about this, but I’ll keep them for my next post.

As a side note. I am not, by any means, an experience Go developer. I haven’t even read Go code beyond Hello World before starting reading the etcd codebase. But I can tell you that this is a very nice codebase to look at. It is clear, nicely laid out, it is possible to go through everything and understand what is going on easily.

Reviewing Metrics .NET project

This is a review of the Metrics.NET project (commit cb52da325c0a88336e09412638f72620d9ba7992).

The project is supposed to give us a way to track metrics about our applications, and we want to make use of it in RavenDB instead of the highly unreliable performance counters. This is going to be a pretty short review, mostly because I don’t really have much to say.  There are a few things that I take issues with (async tasks using Thread.Sleep instead of Task.Delay, but that is probably because it is targeting .NET 4.5).

Other than that, most of the code is actually doing crazy math stuff, and there are proper links to the explanations, and you can see why it is doing so. The impressive thing is that pretty much everything that I wanted to do was already there. Including an easy way to expose metrics over the wire, and that the whole things seems pretty seamless.

Very good work, all around.

For our purposes, however, I think we’ll need to do some other things. In particular, one of the major assumptions throughout the code is that there is always a type associated with a metric, which isn’t the case for our purposes.  More than that, the code now assumes a static and fixed set of metrics for the entire system. That doesn’t work very well for us when we have different metrics for each database, so we’ll probably need to change that as well.

But I am very impressed, this looks like it could sort out a lot of the things that we need to do very quickly.

Tags:

Published at

Originally posted at

Comments (20)

Fail, fail, fail

Sometimes, reading candidates answer is just something that I know is going to piss me off.

We have a question that goes something like this (the actual question is much more detailed):

We have a 15TB csv file that contains web log, the entries are sorted by date (since this is how they were entered). Find all the log entries within a given date range. You may not read more than 32 MB.

A candidate replied with an answered that had the following code:

   1: string line = string.Empty;
   2: StreamReader file;
   3:  
   4: try
   5: {
   6:     file = new StreamReader(filename);
   7: }
   8: catch (FileNotFoundException ex)
   9: {
  10:     Console.WriteLine("The file is not found.");
  11:     Console.ReadLine();
  12:     return;
  13: }
  14:  
  15: while ((line = file.ReadLine()) != null)
  16: {
  17:     var values = line.Split(',');
  18:     DateTime date = Convert.ToDateTime(values[0]);
  19:     if (date.Date >= startDate && date.Date <= endDate)
  20:         output.Add(line);
  21:  
  22:     // Results size in MB
  23:     double size = (GetObjectSize(output) / 1024f) / 1024f;
  24:     if (size >= 32)
  25:     {
  26:         Console.WriteLine("Results size exceeded 32MB, the search will stop.");
  27:         break;
  28:     }
  29: }

My reply was:

The data file is 15TB in size, if the data is beyond the first 32MB, it won't be found.

The candidate then fixed his code. It now includes:

   1: var lines = File.ReadLines(filename);

Yep, this is on a 15TB file.

Now I’m going to have to lie down for a bit, I am not feeling so good.

Reading large codebases

Casey asked me:

Any tips on how to read large codebases - especially for more novice programmers?

As it happens, I think that this is a really great question. I think that part of what makes someone a good developer is the ability to go through a codebase and figure out what is going on. In your career you are going to come into an existing project and be expected to pick up what is going on there. Or, even more nefarious, you may have a project dumped in your lap and expected to figure it out all on your own.

The worst scenario for that is when you are brought in to replace “those incontinent* bastards” that failed the project, and you are expected to somehow get things working. But another common scenarios for this include being asked to maintain a codebase written by a person who left the company. And finally, of course, if you are using any Open Source projects, there is a strong likelihood that you’ll be asked to “can you extend this to also do this”, or maybe you are just curious.

Especially for novice programmers, I would strongly recommend that you’ll do just that. See the rest of the post for actual details on how I do that, but do go ahead and read code.

I usually approach new codebases with a blind eye toward documentations / external details. I want to start without preconceptions about how things are working. I try to figure out the structure of the project from the on disk structure. That alone can tell you a lot. I usually try to figure out the architecture from that. Is there a core part of the system? How is it split, etc.

Then I find the lowest level part of the code just start reading it. Usually in blind alphabetical order. Find a file, read it all the way, next file, etc. I try to keep notes (you can see some examples of those in the blog) about how things are hooked together, but mostly, I am trying to get a feel for the code. There is a lot of code that is usually part of the project style, it can be things like precondition checks, logging, error handling, etc. Those things you can learn to recognize early and then can usually just skip them to read the interesting  bits.

I usually don’t try to read too deeply at this point, I am trying to get a feeling about the scope of things. This file is responsible for X and do so by calling Y & Z, but it isn’t really important to me to know every little detail at that point. Oh, and I keep notes, a lot of notes. Usually they aren’t really notes but more a list of questions, which I fill / answer as I understand more. After going through the lowest level I can find, I usually try to do a vertical slice. Again, this is most so I can figure out how things are laid out and working. That means that the next time that I am going to go through this, I’ll have a better idea about the structure / architecture of this.

Next, I’ll usually head to the interesting bits. The part of the system that make it interesting to me rather than something that is off the shelve.

That is pretty much it, there isn’t much to it. I am pretty much just going over the code and trying to first find the shape & structure, then I dive into the unique parts and figure out how they are made.

In the meantime, especially if this is hard, I’ll try to go over any documentation exists, if any. At this point, I should have a much better idea about how things are setup that I’ll be able to really go through the docs a lot more quickly.

* I started writing incompetent, but this is funnier.

So, what have YOU been learning lately?

One of the worst things that can happen to you professionally is stagnation. You know what you are doing, you know how it works, and you can coast along very easily. Unfortunately, there is the old, it isn’t what we know that we don’t know that is going to hurt us. It is what we don’t know that we don’t know that is going to bite us in the end.

One of the reasons that I have routinely been going out and searching for difficult codebases to read has been to avoid that. I know that I don’t know a lot, I just don’t know what I don’t know. So I go into an unfamiliar codebase and try to figure out how things work over there.

I have been doing that for quite some time now. And I am not talking about looking at some sample project a poo schlump put out to show how you can do CQRS with 17 projects to create a ToDo app. I am talking about production code, and usually in areas or languages that I am not familiar with.

A short list of the stuff that I have been gone over:

  • CouchDB (to learn Erlang, actually, but that got me to do DB stuff).
  • LevelDB
  • LMDB
  • NServiceBus
  • Mass Transit
  • SignalR
  • Hibernate
  • Hibernate Search

Those are codebases that do interesting things that I wanted to learn from. Indeed, I have learned from each of those.

Some people can learn by reading academic papers, I find that I learn best from having a vague idea about what is going on, then diving into the implementation details and seeing how it all fits together.

But the entire post so far was a preface to the question I wanted to ask. If you are reading this post, I am pretty sure that you are a professional developer. Doctors, lawyers and engineers (to name a few) have to recertify every so often, to make sure that they are current. But I have seen all too many developers stagnate to the point where they are very effective in their chosen field (building web apps with jQuery Mobile on ASP.Net WebForms 3.5) and nearly useless otherwise.

So, how are you keeping your skills sharp and your knowledge current? What have you been learning lately? It can be a course, or a book or a side project or just reading code. But, in my opinion, it cannot be something passive. If you were going to answer: “I read your blog” as the answer to that question, that is not sufficient, flatterer. Although, you might want to go a bit further and consider that imitation is the sincerest form of flattery, so go ahead and do something.

Tags:

Published at

Originally posted at

Comments (32)

Some final notes about LMDB review

Okay, having gone through the LMDB codebase with a fine toothed comb, I think that I can safely say that it is both a very impressive codebase and one the dearly need some TLC. I’ll freely admit that I am by no means a C guy. And it is entirely possible that a lot of the issues that I have been bugging me are standard C things. But I don’t think so. Methods that go on for hundreds of lines, duplicated code and plethora of gotos hardly seem to be the things that pop to mind when I hear good C code.

But beyond my issues with the code, the implementation is really quite brilliant. The way LMDB manages to pack so much functionality by not doing things is quite impressive. Interestingly, you couldn’t write this database even 5 years ago. LMDB relies on being able to map the db into memory, and up until x64 became prevalent, you just couldn’t do that for any db with a meaningful size. With x64 and the effectively unlimited address space we have (will I be laughing at my naivety in a few years?), that is no longer an issue.

I learned quite a lot from the project, and it has been frustrating, annoying and fascinating experience.

Tags:

Published at

Originally posted at

Comments (16)

Lightning memory-mapped database is Esent

Here is something that suddenly dawned at me as I was going through the LMDB codebase. LMDB is actually how Esent works. In fact, going through LMDB codebase I am in a much better position to understand how Esent works.

For example, Esent is limited to 16TB per database. I did some math to figure out the max size of a LMDB db that used 32bits for page number and I got to 2 ** 32 times 4,096 == 16 TB, and that isn’t a coincidence. And all the background information in the Esent API that talk about large values, for example. That is how LMDB does overflow, and tables are implemented as additional B-Trees (what LMDB calls dbs).

At some point I was going through this in my head and realized that indexes are also working in the exact same way, and so does pretty much everything else. What Esent does on top is provide better concurrency (LMDB allows only a single writer), and a whole lot of additional features (schemas, secondary indexes, etc).

But I think that the major difference from my point of view is that I had a theoretical idea about how Esent really worked, under the covers. Now I feel like I actually grok things at a much better level. For instance, Esent will not give back space to the operating system, even if you deleted stuff. I knew that, and just accepted that as how it worked. But now I can understand why it doesn’t do that. Or the MAX_VER_PAGES setting, I knew what it purpose was, and how it impacted operations (it is the amount of changes that can be pending). But after reading LMDB dirty_pages and its limit, I understand the why and how it works at a much deeper level.

To be honest, I kept reading LMDB codebase purely because I wasn’t going to let that code beat me, but it is very educational.

Tags:

Published at

Originally posted at

Comments (27)

Reviewing Lightning memory-mapped database library: MVCC

MVCC stands for Multi Versioning Concurrency Control. This is how you can have both readers & writer at the same time and not have to arrange locks for the two. With LMDB, MVCC is implemented by always creating a new page, never modifying them in place. That also means that when we “free” a page, we need to make sure not to actually use it until all the transactions that could see it has completed.

To be honest, I can’t follow the code. It is somewhat related to me_pghead , but I just can’t follow what is going on. I think that this is related to the way it manage multiple transactions, but I am just unable to follow the code .Maybe it is just that I overloaded my senses with too much C code, I have been diving into this code, and sometimes it feels like this:

That said, I understand how it has to work, so that should be enough for now. Next, I want to see how to do it myself Smile.

Tags:

Published at

Originally posted at

Comments (5)

Reviewing Lightning memory-mapped database library: What about free pages?

Okay, so in my last post (and last night, from my perspective), I wrote about the process in which LMDB maintains the B-Tree when it grows. But what about when it shrinks? And how does LMDB handle things like reusing space?

For that matter, we need to discuss another thing. As I understand this right now, LMDB is going to write a page once and only once .And when that is done, that page is now immutable. Updates to this page will result in a new page, and the old one will be marked for reuse. So you don’t actually have to worry about updating the page while others may be reading it. In other words, I would expect this code to use a lot of pages:

image

Actually, I would expect it to use two pages all the time. Alternating between the two after every transaction.

Indeed, following the code, it is easy to see that this magic happens during mdb_page_touch. We find (somehow, not sure how yet) a free page (or create a new one), and we mark the old one for reuse. That means that when you actually write to the data, you aren’t really writing to the old page, you are creating a new one. This drastically reduces a lot of complexity. However, it does mean that the db will be using more space, since every write transaction will use a minimum of one new page (and probably more, considering that there is the B-Tree to maintain). in fact, on average size dbs, I would be surprised if a single transaction could write less than 3 – 4 pages as a minimum. I’ll test the min number of pages per transaction in a bit, right now I want to focus on the implications of that decision.

Well, to start with, concurrency is easier. We aren’t writing to old data, so we have MVCC. The only thing we need to ensure is that we aren’t reusing a page before all of its transactions are complete. But it also has other implications, I haven’t confirmed my suspicions about transaction commits, but because we are writing to new locations, not modifying existing ones, this means that a crash midway would simply restore us to the previous state, and will not corrupt any data.

Since we don’t modify pages, it means that free pages aren’t just things that happen when we delete data, they happen all the time, so it would be interesting to see how LMDB handles them. Here is the relevant piece of the code:

image

So either find me a free page, or allocate a new one. Then add the existing page to the free list for the current transaction. A bit lower in the code we copy the data in the old page to the new one, and we are ready to go.

So, when we make updates, at least in this scenario, we are cycling between two pages, always allocating one and freeing the other, and on the next transaction we go the other way. This is really quite interesting, from the perspective of comparing that to other storage engines. CouchDB work in pretty much the same way, B-Tree with all the associated benefits. But CouchDB model is based on append only, and it cannot reuse the space in the file, which require compactions. LMDB model is much nicer in that regard. Since in both cases, in place updates are not allowed, there is a lot of wasted space that goes just for those updates. In LMDB’s case, that wasted space is a lot bigger, because it works in page sizes, and can reuse them. In CouchDB’s case, it doesn’t reuse space, so it doesn’t use pages (more space efficient this way).

Side note: C’s lack of native data structures beyond array is really annoying. You realize that in C, a Stack is a design pattern?!

And about the minimum number of pages per transaction, let us see what is going on about that. So I wrote the following code:

image

This should generate a deep enough B-Tree to show what I want. And indeed, the action happens here:

image

Both the root node and the data node are modified, because of the cursor stack. It isn’t really a big deal, to be honest, since usually you would only need to update H pages (where H is the height of the tree) and H is very small for B-Trees.

But this leads to me to consider something else. Usually when talking about on disk structures, one of the more important things that you want to keep in mind is reducing seeks. But the B-Tree model that we have here would result in having pages pretty much all over the file. Now, in something like CouchDB, it doesn’t actually matter, because the file is append only, so you always end up scanning from the end of the file backward. But because LMDB is reusing pages, it is entirely possible for a root page to be in the middle of the file, the next level to be in the start and the actual data in the end.

Now, I am guessing that this isn’t actually an issue, because LMDB relies on the OS page cache to handle this, and that would take care of that. In practice, I expect, the branches of the tree are always going to be in memory, since they were the last written and the most accessed.

And that is enough for now. Next, I want to figure out how we return free pages to the system. In order to do that, I think that I’ll need to look at how we transactions are committed.

Tags:

Published at

Originally posted at

Comments (11)

Reviewing Lightning memory-mapped database library: A thoughtful hiatus

I thought that I would stop a bit from focusing on what the LMDB code is doing in favor to some observations about the code itself. Going into this codebase it like getting hit in the face with a shovel. Now, this might be my personal experience, as someone who has done a lot of managed code work in the past. But I used to be a pretty horrible C/C++ guy (the fact that I say C/C++ should tell you exactly what my level was).

But I don’t think that it was just that. Even beyond the fact that the code is C, and not C++ (which I am much more used to), there is a problem that only become clear to me well after I read the code for the millionth time. It grew. Looking at the way the code is structured, it looks like it was about as nice a C codebase as you can get (don’t get excited, that isn’t saying much). But overtime, features were added, but the general structure of the codebase wasn’t adjusted to account for that.

I am talking about things like this:

image

image

image

There are actually 22 (!) ‘if(IS_LEAF(mp))’ references in the codebase.

Or what about this?

image

image

It looks like certain features (duplicate keys support, for example) was added that had a lot of implication on the code, but it wasn’t refactored accordingly. It make it very hard to go through.

Tags:

Published at

Originally posted at

Comments (4)

Reviewing Lightning memory-mapped database library: On page splits and other painful things

image

I don’t know if you noticed,  but the LMDB codebase is giving me serious headache issues. The bloody thing is a very dense piece of code, but at the same time, it is also quite interesting. In particular, B-Trees are pretty much The Answer for a lot of on disk data structures, but they tend to be quite hard to handle properly. So I am looking forward to this piece of code, in which I am going to figure out if I can figure out this code. Just to note mdb_page_split is also another 400 lines method with goto sprinkled all over.

In order to do that, I wrote the following code (in a single transaction):

image

And then I spent another few minutes trying to get it to compile. Mostly because I started out with “for (int i=0; …” and C doesn’t allow that (blah!).

Anyway, I got the page to split, and now I want to see how it actually behaves. I am currently at the first part where we take the root (currently leaf) page and turn it into a branch. This is an interesting case of a root split.

We start with:

image

And the first thing we do is to go to this mode:

image

We add an empty implicit pointer to the previous root. But this is really just the beginning, now we need to divide the entries that used to be in the root between the left & right sides. This is made complex by the fact that you might have it setup so it has a few large & small values, so just cutting them in the middle would produce a result that would be too big. At least, that is what a comment says. I am not sure how that can be. We are going from one page to two pages, so I don’t see how you can get into a situation where that would happen. I guess it is time to slot into more code.

Okay, I got it, the issue is that we do a page split because we need to add a new item. So that is the reason we have to jump through those hops. Because we add a new item (that is already too big for the original page, since that is why we are actually splitting that).

Another complex issue with page splits is that they might be recursive. If we need to split a page, we need to add an entry to the parent page, which might cause that page to split, etc…

An interesting observation on the sizes involved, a LMDB page has 4084 bytes available for storage. The data for the page number is 8 bytes long (it uses pointer size page number) and assuming keys that are 16 bytes keys in size (and including about 8 bytes for node header), we get about 128 keys per branch page. Even considering that B-Tree are specifically designed to make themselves efficient in the presence of memory hierarchies, it is  quite impressive.

Consider, assuming a full tree, if we hold just the root and the first level in memory, we would consume about 512kb. And that would give us just one seek to get any of ~2 million items. As an aside, one reason that I like reading this code is that for once, this is a B-Tree implementation that isn’t covered in locking code, which is how this is usually works in RDBMS.

Another aspect that is quite interesting is that this code really shows important aspects for working with relational databases. It is all about keeping things inside the same page, with spill over to overflow pages slowing things down because you need to do another seek. Or a really obvious explanation why page splits are such a bad thing, and a lot of other details that you learn when you go a bit deep into relational databases but (at least for me) have never been real before I started dealing with building databases myself.

Tags:

Published at

Originally posted at

Comments (15)

Reviewing Lightning memory-mapped database library: On disk data

As mentioned, the data in LMDB is actually stored in page. And I am currently tracking through the process in which we add a new item to the database. The first thing that happen is that we allocate a leaf page. I think that I’ll have to go over how pages are allocated now, which should explain a lot about how LMDB reuses disk space.

A new page is allocated by first finding the oldest transaction that is still active. This is how LMDB implements MVCC. Basically, any free page that is older than any existing transaction is eligible for reuse. The first db (although a better term would be to call it btree, or maybe a table) contains the free pages, and at first we search there for an available page. If we can’t find such a page, we use the end of the file for that. This happens in mdb_page_alloc.

An very interesting aspect is the fact that LMDB allows direct memory writes (with the additional risk of corrupting the db if you messed the data), interestingly, this is done in the following code:

image

If the options allows directly memory writes, you get a point to the mmap file. Otherwise, LMDB will allocate a page (or re-use one that has already been allocated).

I am not really sure what is going on with the insert yet. This is a function pointer (delegate for C#). And it select which behavior will happen later on. I am just not sure what those two different function do yet.

I think I got it, the key is here:

image

You can see that we use the insert method to add the mid variable to the dirty list. So if we give you a direct pointer, we append it to the list. But if we give you allocate a page, we need to add it in order.

The mdb_mid2l_insert will add an item to the list in the appropriate location to ensure sorting. I think that this is done so when we actually write the dirty page to disk, if we do that using file I/O, we will do that in ascending file order, so we can get good seek times from the disk. A new page has 4,084 bytes available for it (12 bytes are taken by header data).

A database is actually created the first time data is written to it. The root page is allocated and recorded. And then the data is added to the page.

As you might recall, data inside a page is kept in a sorted list. But remember that the data is also stored on the page, and there is the whole notion of overflow pages, so I think that I am going to have to draw a picture to figure out what is going on.

image

This is more or less how it works. But I don’t think that I really get it. Note that there are two collections here. First is the positions of nodes in the page, and the second is the node themselves. The reason we have this complexity is that we need to maintain both of them in an efficient manner. And we need to do that without too much copying. Therefor, the list of nodes in the page is in the beginning, growing downward, and the actual nodes are in the end, growing upward. The page is full when you can’t fit anything between those two lists.

A node, by the way, is the LMDB internal name for a key (and maybe data). But I think that I might have an error here, because I don’t see the code here to actually add nodes to the page in a sorted fashion, so it might be doing linear search on the content of a node. I am not tracking through the code for adding a second value into a database. And I think that I have to be wrong about linear node search. The code I was looking at was invoked at the the new db creation section, so it might be taking shortcuts that aren’t generally available.

At any rate, the logic for adding an item goes like this… first we need to find the appropriate page. And we do this by starting from the root and asking for the actual page. Which gets us to mdb_page_get, where we have the following:

image

The really interesting part here is that each transaction have a dirty list of pages, so if you modified a page, you’ll get your own version, even before the data was committed. Otherwise, we will just hand you the current version. This is quite nice.

And then we get to the already familiar mdb_page_search_root function, and I can confirm that the nodes in a page are actually sorted, which only make sense. I am not sure where that happens yet, but I have got the feeling that I am going to see that in a bit.

Okay, I am going to go on a bit of a rant here, mostly against C, to be honest.

image

Take a look at this code. mdb_page_search does something, but what it does it mutate the state for the cursor (mc), and you are then going to access the mc->mc_pg[mc->mc_top] to get the actual result you wanted. To make things more interesting, the is also a mc->mc_ki, which is the index of the node inside the node. And it drove me crazy, because I just couldn’t see the association between those three values, especially because I am so not used to this type of code that I never even considered this as a possibility.

At any rate, now I know how it gets to insert things in the right order. When doing mdb_page_search, it calls to mdb_node_search, which does the appropriate setup to tell the next call where it need to actually do the insert to get things in a sorted fashion. I am currently in the mdb_cursor_put, which is trice cursed and really hard to follow. A 400+ lines method with gotos galore.

But I am in the section where we are actually going to be writing a new value. (new_sub: goto header, if you care). And the code is pretty straight forward from there.

Next up, I want to see how it handles a page split, but that is a subject to another post.

Tags:

Published at

Originally posted at

Comments (10)

Reviewing Lightning memory-mapped database library: Stepping through make everything easier

Okay, I know that I have been critical about the LMDB codebase so far. But one thing that I really want to point out for it is that it was pretty easy to actually get things working on Windows. It wasn’t smooth, in the sense that I had to muck around with the source a bit (hack endianess, remove a bunch of unix specific header files, etc). But that took less than an hour, and it was pretty much it. Since I am by no means an experienced C developer, I consider this a major win. Compare that to leveldb, which flat out won’t run on Windows no matter how much time I spent trying, and it is a pleasure.

Also, stepping through the code I am starting to get a sense of how it works that is much different than the one I had when I just read the code. It is like one of those 3D images, you suddenly see something.

The first thing that became obvious is that I totally missed the significance of the lock file. LMDB actually create two files:

  • lock.mdb
  • data.mdb

Lock.mdb is used to synchronized data between different readers. It seems to mostly be there if you want to have multiple writers using different processes. That is a very interesting model for an embedded database, I’ve to admit. Not something that I think other embedded databases are offering. In order to do that, it create two named mutexes (one for read and one for write).

A side note on Windows support:

LMDB supports Windows, but it is very much a 2nd class citizen. You can see it in things like path not found error turning into a no such process error (because it try to use GetLastError() codes as C codes), or when it doesn’t create a directory even though not creating it would fail.

I am currently debugging through the code and fixing such issues as I go along (but no, I am doing heavy handed magic fixes, just to get past this stage to the next one, otherwise I would have sent a pull request).

Here is one such example. Here is the original code:

image

But ReadFile in Win32 will return false if the file is empty, so you actually need to write something like this to make the code work:

image

Past that hurdle, I think that I get a lot more about what is going on with the way LMDB works than before.

Let us start with the way data.mdb works. It is important to note that for pretty much everything in LMDB we use the system page size. By default, that is 4KB.

The data file starts with 2 pages allocated. Those page contain the following information:

image

Looking back at how CouchDB did things, I am pretty sure that those two pages are going to be pretty important. I am guess that they would always contain the root of the data in the file. There is also the last transaction on them, which is what I imagine determine how something gets committed. I don’t know yet, as I said, guessing based on how CouchDB works.

I’ll continue this review in another time. Next time, transactions…

Tags:

Published at

Originally posted at

Comments (17)

Reviewing Lightning memory-mapped database library: going deeper

Okay, I now have a pretty rough idea about how the codebase actually works. I still think that the codebase is quite ugly. For example, take a look at this:image

The len parameter for CreateFile is whatever to open or create or just open (read only). But why is it in a parameter called len?

Probably because it was just there, and it would be a sin to create another local variable just for this, I guess (in a codebase where a method had > 18 local variables!).  To make things more interesting, in the rest of this method, this is actually a string len variable, sigh.

At any rate, let us actually dig deeper now. The following structure is holding data about a db.

image

This is actually somewhat misleading, at least with regards to how I would think about a db. This is the entry point for all the pages that belong to a specific db. But a db in LMDB is not really the same thing as a db in SQL Server or RavenDB. It all reside in the same file, and you always have at least two. The first one is the free db, which is used to track all the free pages. The second one is the main db. Then you have additional, named databases.

This is used here:

image

This define the metadata for the entire environment. Note that we have the two dbs there in mm_dbs. The mm_txnid denotes the last committed transaction id. This value is what gives LMDB its MVCC support.  The mm_last_pg value is the last used page, any transaction that wants to write will start writing at that value.

Let us see how we deal with pages here, shall we?

image

The first part find try to find a dirty page if we are in a read/write transaction and we haven’t specify that we can write directly to memory. This is done by doing a binary search on the list of dirty pages.

Otherwise, we can just hand the user the actual page by accessing it directly.

Next, let us look where this is actually used. Searching for a page with a specific key in it. This is done mostly in mdb_node_search.

image

This seems to be doing a binary search for the keys inside a specific page (in this case, the page that is at the top of the stack on the cursor). That leads to the conclusion that pages internally have data internally stored as sorted arrays.

And this leads me to another pet peeve with this code base. Take a look at this line:

image

Sure, this is a well known trick to cheaply divide  a number by half, but are you seriously telling me that the compiler isn’t going to optimize  (low + high) / 2 ? To my knowledge, no C compiler updated in the last 10 – 15 years managed to miss this optimization. So why write code that is going to be harder to read?

Okay, so now we know how we search for a specific key inside a page, but how do we get to the actual page that we want to search on? This happens on mdb_page_search_root. Let me see if I can take it apart.

When this method is called, the cursor is setup so the first page on the pages stack is the root page. 

And… that is enough for me. Up until now, I have been trying to just read the code. Not execute it, not debug through it, nothing .Just go over the code one line at a time and figure out what is going on. I actually think that I have a really good grasp about what is going on in there, but I think that this is pretty much all I can do at that point from just reading the code. So I am going to stop now and setup an debug environment so I can work with it, and report my finding from stepping through the code.

Tags:

Published at

Originally posted at

Comments (13)