Ayende @ Rahien

Hi!
My name is Oren Eini
Founder of Hibernating Rhinos LTD and RavenDB.
You can reach me by phone or email:

ayende@ayende.com

+972 52-548-6969

, @ Q c

Posts: 5,953 | Comments: 44,408

filter by tags archive

Decompression code & Discussion


As I said, a good & small interview question is this one, it is a good one because it is short, relatively simple to handle, but it should show a lot of things about your code. To start with, being faced with a non trivial task that most people are not that familiar with.

Implement a Decompress(Stream input, Stream output, byte[][] dictionary) routine for the following protocol:

Prefix byte – composed of the highest bit + 7 bits len

If the highest bit is not set, this is a uncompressed data, copy the next len from the input to the output.

If the highest bit is set, this is compressed data, the next byte will be the index in the dictionary, copy len bytes from that dictionary entry to the output.

I couldn’t resist doing this myself, and I came up with the following:

public void Decompress(Stream input, Stream output, byte[][] dictionary)
{
    var tmp = new byte[128];
    while (true)
    {
        var readByte = input.ReadByte();
        if (readByte == -1)
            break;
            
        var prefix = (byte) readByte;
        var compressed = (prefix & 0x80) != 0;
        var len = prefix & 0x7f;

        if (compressed == false)
        {
            while (len > 0)
            {
                var read = input.Read(tmp, 0, len);
                if(read == 0)
                    throw new InvalidDataException("Not enough data to read from compressed input stream");
                len -= read;
                output.Write(tmp, 0, read);
            }
        }
        else
        {
            readByte = input.ReadByte();
            if(readByte == -1)
                throw new InvalidDataException("Not enough data to read from compressed input stream");
            output.Write(dictionary[readByte], 0, len);
        }
    }
}

Things to pay attention to: Low memory allocations, error handling, and handling of partial reads from the stream.

But that is just part of the question. After reading the protocol, and implementing it. The question now turns to what does the protocol says about this kind of compression scheme. The use of just 7 bits to store len drastically limit the compression utility in a general format. It also requires an external dictionary, which most compression formats don’t use, they use the actual compressed text itself as the dictionary.  Of course, I’ve been reading compression algorithms for a while now, so that isn’t that fair. But I would expect people to note that that 7 bit limits the compression usability.

And hopefully, with a bit of a hint, they should note that the external dictionary is useful for small data sets where the repetitions are actually between entry, not per entry.

Interview challenges: Decompress that


I’m always looking for additional challenges that I can ask people who interview at Hibernating Rhinos, and I run into an interesting challenge just now.

Implement a Decompress(Stream input, Stream output, byte[][] dictionary) routine for the following protocol:

Prefix byte – composed of the highest bit + 7 bits len

If the highest bit is not set, this is a uncompressed data, copy the next len from the input to the output.

If the highest bit is set, this is compressed data, the next byte will be the index in the dictionary, copy len bytes from that dictionary entry to the output.

After writing the code, the next question is going to be, what are the implication of this protocol? What is it going to be good for? What is it going to be bad for?

The dark sides of Lucene


I’ve been using Lucene for the past six or seven years, and after my last post, I thought it would be a good idea to talk a bit about the kind of things that it isn’t doing well. We’ve been using it extensively in RavenDB for the past 5 years, and I think that I have a pretty good understanding of it. We used to have one of Lucene.NET committers working at Hibernating Rhinos, so I’ve a high level of confidence that I’m not just stupidly not using it properly, too.

Probably the part that caused us the most pain with Lucene was the fact that it isn’t transactional. That is, it is quite easy to get into situations where the indexes are corrupted. That make it… challenging to use it in a database that needs to ensure consistency. The problem is that it is really not a use case that Lucene is well suited for. In order to ensure that data is saved, we have to commit often, the problem is that in order to ensure good performance, we want to commit less often, but then we will the changes if we crash. For that matter, Lucene doesn’t do any attempt to actually flush the data properly, relying on the OS to do that, a system crash can cause you to lose data even though you “committed” it.

Fun times, I can tell you that.

Next, we have the issue of what Lucene call updates. Updates in Lucene are actually just delete/add, and they don’t maintain the same document id (more on that later). Because of that, you usually have to have an additional field in the index that would be your primary key, and you handle updates by first deleting then adding things. That is quite strange, to be fair, and it means that you can’t “extend” an index entry, you have to build it from scratch every time.

Speaking of this, let us talk a bit about deletes. Ignoring for the moment the absolutely horrendous decision to do deletes through the reader, let us talk about how they are actually done. Deletes are recorded in a separate file, and that means that the moment you have any deletes (or, as I mentioned, updates), all the internal statistics are wrong.  We run into this quite often with RavenDB when we are doing things like facets or suggestions. For example, if you have request a suggestion for a user name, it will happily give you suggestions for deletes users, even though we deleted it in Lucene.

It will go away eventually, when it is ready to optimize the index by merging all the files, but in the meantime, it makes  for interesting bug reports.  Speaking of merging, that is another common issue that you have to deal with. In order to ensure optimal performance, you have to be on top of the merge policy. This results in some interesting issues. For RavenDB’s purposes, we do a writer commit after every indexing batch. That means that if you are writing to RavenDB slowly enough, we do a commit after every document write. That result in a lot of segments, and the merge policy would have to do a lot of merges. The problem here is that merges have two distinct costs associated with them.

First, and obviously, you are going to need to write (again) all of the documents in all of the segments you are merging. That is very similar to doing merges in LevelDB ( indeed, in general Lucene’s file format is remarkably similar to SST ). Next, and arguably more interesting / problematic from our point of view is the fact that it also kills all of the caches. Let me try to explain, Lucene uses a lot of caches to speed things up, in fact, most of the sorting is done by using the caches, for example.  That works really well when we are querying normally, because segments are immutable, which makes for great caching. But on a merge, not only have we just invalidate all of our caches, we now need to read, again, all of the data that we just wrote, so we would be able to use it. That can be… costly. And both things can introduce stalls into the system.

The major problem externally with merges is that the document id changes, and that means that you cannot rely on them. It would be much easier if you could send an id out into the world, and get it back later and do something with it, but that isn’t possible with Lucene.

Next, and not really an operational issue like the rest, Lucene’s multi threaded behavior is… a hammer to an egg, in most cases. By that I mean code like this:

image

I mean, it is certainly functional, but it is pretty ugly.

Now, don’t get me wrong, I think that Lucene is pretty neat. But there are some really dark corners there. For example, the actual searching, go ahead and try to find where that is done in Lucene. It is very easy to get lost between all of the different aspects: weights, sorters, queries and various enumerators. For fun, a lot of that runs at hard to figure out times, making the actual query run time interesting to try to figure out.

As a good example, let us take the simplest possible query, TermQuery. Go ahead, try to find where it is actually doing the query for matching terms in this code: https://github.com/apache/lucene/blob/LUCENE_2_1/src/java/org/apache/lucene/search/TermQuery.java

That actually happens here: https://github.com/apache/lucene/blob/LUCENE_2_1/src/java/org/apache/lucene/search/TermScorer.java#L79, and it is effectively a side effect of calling reader.termDocs(term) that limit the matches only to those with the same term.  Trying to track down where exactly things happen can be… interesting.

Anyway, this post is getting to long, and I want to get back to figuring out how Lucene does its thing without dwelling too much in the dark…

What Lucene does, a look under the hood


Lucene is a search engine library, which is great. But as it turns out, there is a lot going on there. After working with it for several years, I can say with confidence that it is a pretty awesome library. But surprisingly, a lot of the effort that went into it doesn’t seem to be talked about / visible to people not trolling through the code. I think that this is a pretty good testament for how successful it is. That, and the fact that it is now the base line against which all other search libraries & engines are compared.

What I wanted to talk about today was the kind of things that Lucene is doing that doesn’t seem to get much publicity. I think that Spolsky said it best:

Back to that two page function. Yes, I know, it's just a simple function to display a window, but it has grown little hairs and stuff on it and nobody knows why. Well, I'll tell you why: those are bug fixes. One of them fixes that bug that Nancy had when she tried to install the thing on a computer that didn't have Internet Explorer. Another one fixes that bug that occurs in low memory conditions. Another one fixes that bug that occurred when the file is on a floppy disk and the user yanks out the disk in the middle. That LoadLibrary call is ugly but it makes the code work on old versions of Windows 95.

I remember just how much impact that article made on me at the time. And Lucene’s codebase bear true for this words. Lucene is a search engine library, which basically means that it does:

  • Indexing
  • Querying
  • Maintenance

One of the major areas of maturity in Lucene is how it optimized indexing. You can see it in the code. For example, Lucene goes to a great deal of trouble to avoid allocating memory willy nilly. Instead, pretty much everything there is done via object pools. This helps reduce the memory pressure when doing a lot of indexing and can save a lot of GC cycles.

Another is the concept of multiple threads for indexing .A lot of Lucene is build around this idea, it has a lot of per thread state that is meant to ensure that you don’t have to deal with concurrency yourself. The idea is that you can take an IndexWriter and write to it concurrently, then call commit. A lot of the work to do with indexing is CPU intensive, so that makes a lot of sense, and Lucene nicely isolates you from all of that work. There is DocumentWriterPerThread, so you can see really nice scaling effects as you throw more threads & hardware at the problem.

Usually, when people start messing with Lucene, they do that by writing analyzers, and you are sort of exposed to the memory constraints by being encourage to use ReusableTokenStream, etc. It has also a nice pipeline architecture for doing the indexing work with filters.

On the querying side, Lucene does a lot of work to ensure that things just works. It has a Boolean Model for searches, and Vector Space Model for ranking. Writing your own Query classes is pretty easy too, once you understand how things work, and again, this is another common place for people to extend Lucene. But there is a lot going on behind the scenes. Lucene does a lot of caching on a segment basis, and it is quite nice, since segments are immutable, it means that you can get pretty good usage out of that.

That give it a lot of its speed, and it means that over time, things are actually going to be faster, because more parts of the segments are in memory and cached.

Finally, we have all of the other work that Lucene does. In practice, it means things like merging segments (hopefully in the background), and keeping the overall system humming along. Unfortunately, that is also one of the places that are usually most common for people to start tinkering with when they run into perf problems. That is anything but trivial, and optimizing it is something that require a lot of expertise and understanding about the specific scenario you have.

And on top of that, you have everything else that already works on top of Lucene. Which is quite a lot.

As I said earlier, that is a very impressive piece of technology. That doesn’t mean that it doesn’t have its own set of problems, but that is something that I’ll discuss in detail in my next post.

Sorting with Lucene


I talked about the Lucene formula and how it calculate things using tf-idf for best matches. Now I want to talk about the actual sorting implementation. As it turned out, the default sorting (by relevancy) is really simple. All you need is to get the relevant score for a query, then you shove the results through a heap with a specified size. The heap will take care of maintain the top results.

So far, that is pretty simple to understand. But the question is, how do you do sorting on a field value? The answer is, not easily.

image

GetStringIndex() does something very interesting. I returns  a string index, which gives us:

  • A string array containing all the distinct (sorted) value for this index.
  • A int array with all the documents in the index, with the position of the value of that field in the string value array

Now we can compare fields by their field position on the field, which give us pretty good sorting. Unfortunately, this also require us to load all the values to memory. Let us see another example, which would probably be easier to follow:

Sorting by an integer is done like this:

image

Get an array (whose size match the number of documents),  We can then sort things easily because accessing the relevant field value only require us to have the document id to index into the array.

The reason Lucene does this is that it uses an inverted index, and it has no easy way of going from the field values to the list documents it has. So it is easier to read all the values into memory and work with them there. I don’t like it, but off hand, I can’t think of a better way to handle this.

The Lucene formula: TF & IDF


The basis of how Lucene work is tf–idf, short for term frequency–inverse document frequency. You can read about it here. It is pretty complex, but it is explained in detailed in the Lucene docs. It all fall down to:

image

For now, I want to look at the tf() function. The default implementation is in DefaultSimilarity.Tf, and is defined as:

image

That isn’t really helpful, however. Not without knowing what freq is.  And I’m pretty sure that Sqrt isn’t cheap, which probably explains this:

image

So it caches the score function, and it appears that the tf() is purely based on the count, not on anything else. Before I’ll go and find out what is done with the score cache, let’s look at the weight value. This is calculated here:

image

And idf stands for inverse document frequency.  That is defined by default to be:

image

And the actual implementation as:

image

And the caller for that is:

image

So, we get the document doc frequency of a term, that is, in how many documents this term shows, and the number of all the documents, and that is how we calculate the idf.

So far, so good. But how about that doc frequency? This is one of the things that we store in the terms info file. But how does that get calculated?

I decided to test this with:

image

This should generate 3 terms, two with a frequency of 1 and one with a frequency of 2. Now, to figure out how this is calculated. That part is a real mess.

During indexing, there is a manually maintained hash table that contains information about each unique term, and when we move from one document to another, we write the number of times each term appeared in the document. For fun, this is written to what I think is an in memory buffer for safe keeping, but it is really hard to follow.

Anyway, I now know enough about how that works for simple cases, where everything happens in a single segment. Now let us look what happens when we use multiple segments. It is actually quite trivial. We just need to sum the term frequency each term across all segments. This gets more interesting when we involve deletes. Because of the way Lucene handle deletes, it can’t really handle this scenario, and deleting a document do not remove its frequency counts for the terms that it had. That is the case until the index does a merge, and fix everything that way.

So now I have a pretty good idea about how it handled the overall term frequency. Note that this just gives you the number of times this term has been seen across all documents. What about another important quality, the number of times this term appears in a specific document? Those are stored in the frq file, and they are accessible during queries. This is then used in conjunction with the overall term frequency to generate the boost factor per result.

Peeking into Lucene indexing


Continuing my trip into the Lucene codebase, now I’m looking into the process indexing are happening. Interestingly enough, that is something that we never really had to look at before.

It is quite clear that Lucene is heavily meant for utilizing additional threads for improving overall indexing speed. You can see it in the number of per thread state that exist all over the indexing code. That is also meant to reduce memory consumption, as far as I can see.

image

The real stuff happens in the ProcessDocument() where we have a chain of DocFieldConsumerPerThread and TermsHashConsumerPerThread which actually do the work.

Then the real work is happening on a per field level in DocInverterPerField, where the analyzer is actually called. The process in which the analyzers return values for the text is interesting. There are “attributes” that are added to the stream, per token, and they are used to get the relevant values. I assume that this allows to have different levels of analyzers.

image

This way, you can have analyzers that don’t deal with offesets or positions, etc. And the actual processing of this is done in a chain that appears to be:

  • Freq Prox
  • Term Vectors

But that isn’t something that I really care about now. I want to see how Lucene writes the actual terms. This is being written in the TermsInfoWriter, which took some time to find.

image

Terms are stored in a prefix compressed mode (sorted, obviously), and there is the actual terms, and an index into the terms, which allows for faster seeking into the file. This is actually done here:

image

This is a single term written to the file. A lot of the stuff Lucene does (prefixes, VInt, etc) are done in the name of conserving space, and it reminds me greatly of LevelDB’s SST. In fact, the way terms are stored is pretty much an SST, except that this happens to be on multiple files. Pretty much the entire behavior and all the implications are the same.

It also means that searching on this is fast, because the data is sorted, but pretty complex. And it also explains a lot about the actual exposed API that it has. I think that I have a pretty good idea on how things work now. I want to now go back up and look at specific parts of how it works… but that is going to be the next post.

The Lucene disk format


I realized lately that I wanted to know a lot more about exactly how Lucene is storing data on disk. Oh, I know the general stuff about segments and files, etc. But I wanted to know the actual bits & bytes. So I started tracing into Lucene and trying to figure out what it is doing.

And, by the way, the only thing that the Lucene.NET codebase is missing is this sign:

image

At any rate, this is how Lucene writes the segment file. Note that this is done in a CRC32 signed file:

image

And the info write method is:

image

Today, I would probably use a JSON file for something like that (bonus point, you know if it is corrupted and it is human readable), but this code was written in 2001, so that explains it.

This is the format of the format of a segment file, and the segments.gen file is generated using:

image

Moving on to actually writing data, I created ten Lucene documents and wrote them. Then just debugged through the code to see what will happen. It started by creating _0.fdx and _0.fdt files. The .fdt is for fields, the fdx is for field indexes.

Both of those files are used when writing the stored fields. This is the empty operation, writing an unstored field.

image

This is how fields are actually stored:

image

And then it ends up in:

image

Note that this particular data goes in the fdt file, while the fdx appears to be a quick way to go from a known document id to the relevant position in the fdx file.

As I was going through the code, I did some searches, and found a very detailed explanation of the actual file format in the docs. That is really nice and quite informative, however, just seeing how the “let us take the documents and make them searchable” part is quite interesting. Lucene has a lot of chains of responsibilities going through. And it is also quite interesting to see the design choices that were made.

Unfortunately, Lucene is very much wedded to its file format, and making changes to it isn’t going to be possible, which is a shame, since it impacts quite a lot of the way Lucene works in general.

Raft, as the Raven flies


The interesting thing about the etcd / go-raft review wasn’t so much what they did, they did things quite beautifully, but the things that I wanted them to do that they didn’t do. The actual implementation was fascinating, and for the purposes of what is required, more than adequate. I, however, have a very specific goal in mind, and that means that this is really not going to work with the given implementations.

I’ve better explain first. I’m not sure if go-raft was written by the same people as etcd, but they do “smell” very much the same. It just occurred to me that yes, they are actually both on github, so I checked it appears that I was correct. xiangli-cmu, philips, benbjohnson and bcwaldon (among many others) have worked on both projects. The reason this is important is that this reinforce my feeling that go-raft was written mainly to serve the need of etcd. Anyway…

The reason that I think that is that the implementation is very much optimized at the level etcd needs it. What I mean is that there is a lot of work there to support multiple concurrent operations that would be batched to all the nodes in the cluster, then be applied in the in memory model. This work because etcd is a set of key/value pairs that are shared among many nodes, but they are usually very small values, and very small number of keys. I would be surprised if there were hundreds of thousands of keys in a single etcd installation. That just isn’t the type of thing that it is meant to do. And that reflects throughout.

For example, there is an inherit assumption through the codebase, that the data set is small, the each value is small, etc. When sending a snapshot over the wire, there is the assumption that it can all fit in memory and that this is a good thing to do so. Even more to the point, the system where we only push entries to the nodes on heartbeats is great when you are trying to batch multiple changes together, with each change being independent of all the others (usually). However, that really doesn’t work when what you actually need is something like the sort of things that we usually deal with.

I want to emphasize that I’m really evaluating the way etcd & go-raft are built and find them wanting for my own purposes, I think that they are doing quite great for the kind of problem that they are meant to solve.

Let us consider the kind of databases that we have. We usually have to deal with much larger values, typical documents are in the KB range, and are frequently hundreds of KB in size. We also tend to have quite a lot of them. Trying to put them all in memory (the etcd way) would be… suboptimal. Now, one option would be to try and do just that, and have each operation in RavenDB (put/delete documents) as a state machine command in Raft. The problem is that this really gets complicated very quickly, leaving aside the fact that it isn’t a general solution, and we really want to try to get to a general solution.

We don’t want to solve this once for RavenDB, and then for RavenFS, and then for… etc. And the reason that working at that level is tricky is that you need to push everything through the state machine, and everything in this case really does mean everything. That lead to a very different database design and implementation. It is also probably not really necessary. We can drop this a level or two down and instead of dealing at the database level, we can deal with that at the storage layer level. This is basically just an extension of the log shipping idea. Instead of using Raft for high level commands, just use Raft to create a consensus around the sequence of writes to the journal. That means that all the nodes will have the same journal, and thus have the same data in the storage.

That in turn means that an “entry” can actually be pretty large (mutli MB range, sometimes). And because the journal is purely sequential, we need to issue that command to all the nodes as soon as it is submitted to the Raft state machine.

Now, the reason that the go-raft implementation waits for the heartbeat is that it batches things up nicely, and really speed up the general case. But we don’t need to do that for what we are going to do. Or, to be rather more exact, we have already done just that. That is basically what Voron’s transaction merging is all about. And since we can only handle writes as the leader, and since we will merge concurrent writes anyway, that is going to end up as a very similar behavior under load, and faster without load. At least that is what the initial thinking is saying.

Snapshots, also something quite important for Raft, can be handle very simply by just doing a backup/restore over the network, by streaming all the data.

And the rest is just implementing Raft Smile.

Reviewing go-raft, part II


In my previous post, I started to go over the go-raft implementation, after being interrupted by the need to sleep, I decided to go on with this, but I wanted to expand a bit first about the issue we discussed earlier, not checking the number of bytes read in log_entry’s Decode.

image

Let us assume that we actually hit that issue, what would happen?

image

The process goes like this, we try to read a value, but the Read method only return some of the information. We explicitly ignore that, and try to use the buffer anyway. Best case scenario, we are actually getting an error, so we bail early. At that point, we detect the error and truncate the file. Hello data loss, nice to see you. For fun, this is the best case scenario. It is worse if we marshal the partial data without an error. Then we have not the case of “oh, we have a node that is somehow way behind”, we have the case of “this node actually applied different commands than anyone else”.

I reported this issue, and I’m interested to know if my review is in any way correct. With that said, let us move on…

getEntriesAfter gives us all the in memory entries. That is quite similar to how RavenDB handled indexing, for that matter, so it is amusing. But this applies only to in memory stuff, and it is quite interesting to see how this will interact with other parts of the codebase.

setCommitIndex is interesting. In my head, committing something means flushing them to disk. But in Raft’s term. Committing something means applying the commands. But the reason it is interesting is that it has some interesting comments on edge cases. So far, I haven’t see actually writing to disk, mind.

And this one gives me a headache:

image

Basically, this mean that we need to write the commit index to the beginning of the file. It is also an extremely unsafe operation. What happens if you crash immediately after? Did you change go through, or not? For that matter, there is nothing that prevents the OS from first writing the changes you made to the beginning of the file then whatever else you wrote at the end. So a crash might actually leave you with the commit pointer pointing at corrupted data. Luckily, I don’t see anything there actually calling this, though.

The truncate method makes my head ache, mostly because it does things like delete data, which makes my itchy. This is called from the server code as part of normal processing of the append entries request. What this does, in effect, is to say something like: I want you to apply this log entry, make sure that your previous log entry is this, and if it isn’t, revert it back to this entry.  This is how Raft ensure that all the logs are the same across the cluster.

Then we have this:

   1:  // Appends a series of entries to the log.
   2:  func (l *Log) appendEntries(entries []*protobuf.LogEntry) error {
   3:      l.mutex.Lock()
   4:      defer l.mutex.Unlock()
   5:   
   6:      startPosition, _ := l.file.Seek(0, os.SEEK_CUR)
   7:   
   8:      w := bufio.NewWriter(l.file)
   9:   
  10:      var size int64
  11:      var err error
  12:      // Append each entry but exit if we hit an error.
  13:      for i := range entries {
  14:          logEntry := &LogEntry{
  15:              log:      l,
  16:              Position: startPosition,
  17:              pb:       entries[i],
  18:          }
  19:   
  20:          if size, err = l.writeEntry(logEntry, w); err != nil {
  21:              return err
  22:          }
  23:   
  24:          startPosition += size
  25:      }
  26:      w.Flush()
  27:      err = l.sync()
  28:   
  29:      if err != nil {
  30:          panic(err)
  31:      }
  32:   
  33:      return nil
  34:  }

This seems pretty easy to follow, all told. But note the call to sync() there in line 27. And the fact that this translate down to an fsync, which is horrible for performance.

There is also appendEntry, which appears to be doing the exact same thing as appendEntries and writeEntry. I’m guessing that the difference is that appendEntries is called for a follower, and appendEntry is for a leader.

The last thing to go through in the log.go file is the compact function, which is… interesting:

   1:  // compact the log before index (including index)
   2:  func (l *Log) compact(index uint64, term uint64) error {
   3:      var entries []*LogEntry
   4:   
   5:      l.mutex.Lock()
   6:      defer l.mutex.Unlock()
   7:   
   8:      if index == 0 {
   9:          return nil
  10:      }
  11:      // nothing to compaction
  12:      // the index may be greater than the current index if
  13:      // we just recovery from on snapshot
  14:      if index >= l.internalCurrentIndex() {
  15:          entries = make([]*LogEntry, 0)
  16:      } else {
  17:          // get all log entries after index
  18:          entries = l.entries[index-l.startIndex:]
  19:      }
  20:   
  21:      // create a new log file and add all the entries
  22:      new_file_path := l.path + ".new"
  23:      file, err := os.OpenFile(new_file_path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
  24:      if err != nil {
  25:          return err
  26:      }
  27:      for _, entry := range entries {
  28:          position, _ := l.file.Seek(0, os.SEEK_CUR)
  29:          entry.Position = position
  30:   
  31:          if _, err = entry.Encode(file); err != nil {
  32:              file.Close()
  33:              os.Remove(new_file_path)
  34:              return err
  35:          }
  36:      }
  37:      file.Sync()
  38:   
  39:      old_file := l.file
  40:   
  41:      // rename the new log file
  42:      err = os.Rename(new_file_path, l.path)
  43:      if err != nil {
  44:          file.Close()
  45:          os.Remove(new_file_path)
  46:          return err
  47:      }
  48:      l.file = file
  49:   
  50:      // close the old log file
  51:      old_file.Close()
  52:   
  53:      // compaction the in memory log
  54:      l.entries = entries
  55:      l.startIndex = index
  56:      l.startTerm = term
  57:      return nil
  58:  }

This code can’t actually run on Windows. Which is interesting. The issue here is that it is trying to rename a file that is open on top of another file which is open. Windows does not allow it.

But the interesting thing here is what this does. We have the log file, which is the persisted state of the in memory entries collection. Every now and then, we compact it by creating a snapshot, and then we create a new file, with only the entries after the newly created snapshot position.

So far, so good, and that gives me a pretty good feeling regarding how the whole thing is structured. Next in line, the peer.go file. This represent a node’s idea about what is going on in the another node in the cluster. I find the heartbeat code really interesting:

// Starts the peer heartbeat.
func (p *Peer) startHeartbeat() {
    p.stopChan = make(chan bool)
    c := make(chan bool)
    go p.heartbeat(c)
    <-c
}

// Stops the peer heartbeat.
func (p *Peer) stopHeartbeat(flush bool) {
    p.stopChan <- flush
}

// Listens to the heartbeat timeout and flushes an AppendEntries RPC.
func (p *Peer) heartbeat(c chan bool) {
    stopChan := p.stopChan

    c <- true

    ticker := time.Tick(p.heartbeatInterval)

    debugln("peer.heartbeat: ", p.Name, p.heartbeatInterval)

    for {
        select {
        case flush := <-stopChan:
            if flush {
                // before we can safely remove a node
                // we must flush the remove command to the node first
                p.flush()
                debugln("peer.heartbeat.stop.with.flush: ", p.Name)
                return
            } else {
                debugln("peer.heartbeat.stop: ", p.Name)
                return
            }

        case <-ticker:
            start := time.Now()
            p.flush()
            duration := time.Now().Sub(start)
            p.server.DispatchEvent(newEvent(HeartbeatEventType, duration, nil))
        }
    }
}

Start heartbeat starts a new heartbeat, and then wait under the heartbeat function notify it that it has started running.

What is confusing is the reference to the peer’s server. Peer is defined as:

// A peer is a reference to another server involved in the consensus protocol.
type Peer struct {
    server            *server
    Name              string `json:"name"`
    ConnectionString  string `json:"connectionString"`
    prevLogIndex      uint64
    mutex             sync.RWMutex
    stopChan          chan bool
    heartbeatInterval time.Duration
}

And it seems logical to think that this is a remote peer’s server, but this is actually the local server reference, not the remote one. Note that it is actually the flush method that does the remote call.

Flush is defined as:

func (p *Peer) flush() {
    debugln("peer.heartbeat.flush: ", p.Name)
    prevLogIndex := p.getPrevLogIndex()
    term := p.server.currentTerm

    entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest)

    if entries != nil {
        p.sendAppendEntriesRequest(newAppendEntriesRequest(term, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries))
    } else {
        p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.snapshot))
    }
}

The interesting thing here is that the entries collection might be empty (in which case this serve as just a heartbeat). Another thing that pops to mind is that this has an explicitly leader instructing follower to generate snapshots. The Raft paper suggested that this is something that would happen locally on each server on an independent basis.

There is a lot of interesting behavior in sendAppendEntriesRequest(), not so much in what it does, as in how it handles replies. There is a lot of state going on there. It’s very well commented, so I’ll let you read it, there isn’t anything that is actually going on that is complex.

What is fascinating is that while the transport layer for go-raft is HTTP, which is inherently request/response. It actually handles this in an interesting fashion:

  • Requests are synchronous
  • On reply, the in memory state of the peer is updated immediately
  • The response from the peer is queued to be handled by the server event loop

The end result is that a lot of the handling is centralized into a really pretty state machine. The rest of what is going on there is not very interesting, except for snapshots, but those are covered elsewhere.

And now, we are ready to actually go and look at the server code, but… not yet. It is over thousand lines of code, so I think that I’ll go over other stuff first. In particular, snapshotting looks interesting.

image

This is actually quite depressing. Note the State properties here. There is an implicit assumption that it is possible / advisable to go with the entire in memory state like that. I know that I am sensitive to such things, but that seems like an aweful lot of waste when talking about large systems.

Here is one such issue:

image

Let us assume that our state is big, hundreds of MB or maybe a few GB in size.

We currently hold it in memory inside the Snaphsot.STate, then we marshal that to json. Now, I actually had to go and check, but Go’s json package actually does the usual thing and encode a byte array as a base 64 formatted string. What that means, in turn, is that you have an overhead of about 25% that you have to deal with, and this is all allocated in main memory. And then you write it to a file.

This is…. quite insane, to be frank.

Assuming that I have a state that is 100 MB in size, I’m going to hold all of that in memory, then allocate another 125MB just to hold the json state, then write it to a file. Why not write it to a file directly in the first place? (You could do CRC along the way).

The whole thing appear to be assuming small sizes of data. Throughout the entire codebase, actually.

And now, I have no other ways to avoid it, we are going into the server.go itself…

There is a lot of boilerplate stuff there, but the first interesting thing happens when we look at how to apply the log:

image

This says, when we need to apply it, execute the command method on my state. A lot of the other methods are some variant of:

image

Nothing to see here at all.

And we finally get to the key part, the event loop:

image

Let us look in detail on the followerLoop. Inside that function, we have a loop that waits for:

  1. Stop signal, which would lead to us shutting down…
  2. We got an event on our queue…
  3. The timeout for an event has expired…

There is one part there that puzzles me:

image

promotable will return true if the log has any entries at all. I’m not really sure why that is the case, to be honest. In particular, what about the case when we start with an empty server. I’m going to go on reading the code, and we’ll see where it leads us. And it leads us to:

image

So next we need to figure out what is this self join stuff. I am not sure if that is something comes from Raft or from an external source. I found this issue that discusses this, but it isn’t very helpful in terms of understanding who issue the self join command. I tried looking at the etcd codebase, but I didn’t find anything so far. I’ll leave it for now.

The rest of the operations are basically just forwarding the calls to the appropriate methods if they are in the allowed state.

The caniddateLoop method isn’t anything special, it follows the Raft paper pretty nicely, although I have to admit the “candidate becomes follower upon Append Entries command” is buried deep. The same is true for the other behaviors. The appropriate state based responses sometimes are hard to figure out, because you have the state loop, then you have the same apparent behavior everywhere. For example, we need to become follower if we get an Append Entries request. That happens in processAppendEntriesRequest(), but it would actually be easier to see this if we had code duplication. This is a case where getting familiar with the codebase would help understanding it, and I don’t think that this would be a change worth doing, anyway.

Probably the most interesting behavior is in the leadershipLoop when we process a command. A command is added to the server queue using a Do(Command) method. It is then processed in processCommand.

The problem here is that commands are actually appended to the log, and then sent to peers using the heartbeat interval. By default, that stand at 50 ms.

This is great and all, but it does mean that the latency for requests is going to suffer. This doesn’t matter that much for something like etcd. The assumption here is that the requests are all going to be on different things, so we can queue a lot of commands and get pretty good speed overall. It is a problem if in our system, we have to process sequential operations. In that mode, we can’t wait until the heartbeat, and we want to process this right away. I’ll discuss this later,I think. It is a very important property of this implementation (but not for Raft in general).

Looking at the snapshot state, this happens when a follower get this a SnapshotRequest, but I don’t see anywhere that send it. Maybe it is another caller originated thing?

I just looked at the etcd source, and I think that I confirmed that both behaviors are there in the etcd source. So I think that that explains it.

And this is it, basically. I have some thoughts about the implementation of this and of etcd, but I think that this is enough for now… I’ll post them in my next post.

FUTURE POSTS

No future posts left, oh my!

RECENT SERIES

  1. The RavenDB Comic Strip (3):
    28 May 2015 - Part III – High availability & sleeping soundly
  2. Special Offer (2):
    27 May 2015 - 29% discount for all our products
  3. RavenDB Sharding (3):
    22 May 2015 - Adding a new shard to an existing cluster, splitting the shard
  4. Challenge (45):
    28 Apr 2015 - What is the meaning of this change?
  5. Interview question (2):
    30 Mar 2015 - fix the index
View all series

Syndication

Main feed Feed Stats
Comments feed   Comments Feed Stats