Ayende @ Rahien

Refunds available at head office

Reviewing go-raft, part II

In my previous post, I started to go over the go-raft implementation, after being interrupted by the need to sleep, I decided to go on with this, but I wanted to expand a bit first about the issue we discussed earlier, not checking the number of bytes read in log_entry’s Decode.

image

Let us assume that we actually hit that issue, what would happen?

image

The process goes like this, we try to read a value, but the Read method only return some of the information. We explicitly ignore that, and try to use the buffer anyway. Best case scenario, we are actually getting an error, so we bail early. At that point, we detect the error and truncate the file. Hello data loss, nice to see you. For fun, this is the best case scenario. It is worse if we marshal the partial data without an error. Then we have not the case of “oh, we have a node that is somehow way behind”, we have the case of “this node actually applied different commands than anyone else”.

I reported this issue, and I’m interested to know if my review is in any way correct. With that said, let us move on…

getEntriesAfter gives us all the in memory entries. That is quite similar to how RavenDB handled indexing, for that matter, so it is amusing. But this applies only to in memory stuff, and it is quite interesting to see how this will interact with other parts of the codebase.

setCommitIndex is interesting. In my head, committing something means flushing them to disk. But in Raft’s term. Committing something means applying the commands. But the reason it is interesting is that it has some interesting comments on edge cases. So far, I haven’t see actually writing to disk, mind.

And this one gives me a headache:

image

Basically, this mean that we need to write the commit index to the beginning of the file. It is also an extremely unsafe operation. What happens if you crash immediately after? Did you change go through, or not? For that matter, there is nothing that prevents the OS from first writing the changes you made to the beginning of the file then whatever else you wrote at the end. So a crash might actually leave you with the commit pointer pointing at corrupted data. Luckily, I don’t see anything there actually calling this, though.

The truncate method makes my head ache, mostly because it does things like delete data, which makes my itchy. This is called from the server code as part of normal processing of the append entries request. What this does, in effect, is to say something like: I want you to apply this log entry, make sure that your previous log entry is this, and if it isn’t, revert it back to this entry.  This is how Raft ensure that all the logs are the same across the cluster.

Then we have this:

   1:  // Appends a series of entries to the log.
   2:  func (l *Log) appendEntries(entries []*protobuf.LogEntry) error {
   3:      l.mutex.Lock()
   4:      defer l.mutex.Unlock()
   5:   
   6:      startPosition, _ := l.file.Seek(0, os.SEEK_CUR)
   7:   
   8:      w := bufio.NewWriter(l.file)
   9:   
  10:      var size int64
  11:      var err error
  12:      // Append each entry but exit if we hit an error.
  13:      for i := range entries {
  14:          logEntry := &LogEntry{
  15:              log:      l,
  16:              Position: startPosition,
  17:              pb:       entries[i],
  18:          }
  19:   
  20:          if size, err = l.writeEntry(logEntry, w); err != nil {
  21:              return err
  22:          }
  23:   
  24:          startPosition += size
  25:      }
  26:      w.Flush()
  27:      err = l.sync()
  28:   
  29:      if err != nil {
  30:          panic(err)
  31:      }
  32:   
  33:      return nil
  34:  }

This seems pretty easy to follow, all told. But note the call to sync() there in line 27. And the fact that this translate down to an fsync, which is horrible for performance.

There is also appendEntry, which appears to be doing the exact same thing as appendEntries and writeEntry. I’m guessing that the difference is that appendEntries is called for a follower, and appendEntry is for a leader.

The last thing to go through in the log.go file is the compact function, which is… interesting:

   1:  // compact the log before index (including index)
   2:  func (l *Log) compact(index uint64, term uint64) error {
   3:      var entries []*LogEntry
   4:   
   5:      l.mutex.Lock()
   6:      defer l.mutex.Unlock()
   7:   
   8:      if index == 0 {
   9:          return nil
  10:      }
  11:      // nothing to compaction
  12:      // the index may be greater than the current index if
  13:      // we just recovery from on snapshot
  14:      if index >= l.internalCurrentIndex() {
  15:          entries = make([]*LogEntry, 0)
  16:      } else {
  17:          // get all log entries after index
  18:          entries = l.entries[index-l.startIndex:]
  19:      }
  20:   
  21:      // create a new log file and add all the entries
  22:      new_file_path := l.path + ".new"
  23:      file, err := os.OpenFile(new_file_path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
  24:      if err != nil {
  25:          return err
  26:      }
  27:      for _, entry := range entries {
  28:          position, _ := l.file.Seek(0, os.SEEK_CUR)
  29:          entry.Position = position
  30:   
  31:          if _, err = entry.Encode(file); err != nil {
  32:              file.Close()
  33:              os.Remove(new_file_path)
  34:              return err
  35:          }
  36:      }
  37:      file.Sync()
  38:   
  39:      old_file := l.file
  40:   
  41:      // rename the new log file
  42:      err = os.Rename(new_file_path, l.path)
  43:      if err != nil {
  44:          file.Close()
  45:          os.Remove(new_file_path)
  46:          return err
  47:      }
  48:      l.file = file
  49:   
  50:      // close the old log file
  51:      old_file.Close()
  52:   
  53:      // compaction the in memory log
  54:      l.entries = entries
  55:      l.startIndex = index
  56:      l.startTerm = term
  57:      return nil
  58:  }

This code can’t actually run on Windows. Which is interesting. The issue here is that it is trying to rename a file that is open on top of another file which is open. Windows does not allow it.

But the interesting thing here is what this does. We have the log file, which is the persisted state of the in memory entries collection. Every now and then, we compact it by creating a snapshot, and then we create a new file, with only the entries after the newly created snapshot position.

So far, so good, and that gives me a pretty good feeling regarding how the whole thing is structured. Next in line, the peer.go file. This represent a node’s idea about what is going on in the another node in the cluster. I find the heartbeat code really interesting:

// Starts the peer heartbeat.
func (p *Peer) startHeartbeat() {
    p.stopChan = make(chan bool)
    c := make(chan bool)
    go p.heartbeat(c)
    <-c
}

// Stops the peer heartbeat.
func (p *Peer) stopHeartbeat(flush bool) {
    p.stopChan <- flush
}

// Listens to the heartbeat timeout and flushes an AppendEntries RPC.
func (p *Peer) heartbeat(c chan bool) {
    stopChan := p.stopChan

    c <- true

    ticker := time.Tick(p.heartbeatInterval)

    debugln("peer.heartbeat: ", p.Name, p.heartbeatInterval)

    for {
        select {
        case flush := <-stopChan:
            if flush {
                // before we can safely remove a node
                // we must flush the remove command to the node first
                p.flush()
                debugln("peer.heartbeat.stop.with.flush: ", p.Name)
                return
            } else {
                debugln("peer.heartbeat.stop: ", p.Name)
                return
            }

        case <-ticker:
            start := time.Now()
            p.flush()
            duration := time.Now().Sub(start)
            p.server.DispatchEvent(newEvent(HeartbeatEventType, duration, nil))
        }
    }
}

Start heartbeat starts a new heartbeat, and then wait under the heartbeat function notify it that it has started running.

What is confusing is the reference to the peer’s server. Peer is defined as:

// A peer is a reference to another server involved in the consensus protocol.
type Peer struct {
    server            *server
    Name              string `json:"name"`
    ConnectionString  string `json:"connectionString"`
    prevLogIndex      uint64
    mutex             sync.RWMutex
    stopChan          chan bool
    heartbeatInterval time.Duration
}

And it seems logical to think that this is a remote peer’s server, but this is actually the local server reference, not the remote one. Note that it is actually the flush method that does the remote call.

Flush is defined as:

func (p *Peer) flush() {
    debugln("peer.heartbeat.flush: ", p.Name)
    prevLogIndex := p.getPrevLogIndex()
    term := p.server.currentTerm

    entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest)

    if entries != nil {
        p.sendAppendEntriesRequest(newAppendEntriesRequest(term, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries))
    } else {
        p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.snapshot))
    }
}

The interesting thing here is that the entries collection might be empty (in which case this serve as just a heartbeat). Another thing that pops to mind is that this has an explicitly leader instructing follower to generate snapshots. The Raft paper suggested that this is something that would happen locally on each server on an independent basis.

There is a lot of interesting behavior in sendAppendEntriesRequest(), not so much in what it does, as in how it handles replies. There is a lot of state going on there. It’s very well commented, so I’ll let you read it, there isn’t anything that is actually going on that is complex.

What is fascinating is that while the transport layer for go-raft is HTTP, which is inherently request/response. It actually handles this in an interesting fashion:

  • Requests are synchronous
  • On reply, the in memory state of the peer is updated immediately
  • The response from the peer is queued to be handled by the server event loop

The end result is that a lot of the handling is centralized into a really pretty state machine. The rest of what is going on there is not very interesting, except for snapshots, but those are covered elsewhere.

And now, we are ready to actually go and look at the server code, but… not yet. It is over thousand lines of code, so I think that I’ll go over other stuff first. In particular, snapshotting looks interesting.

image

This is actually quite depressing. Note the State properties here. There is an implicit assumption that it is possible / advisable to go with the entire in memory state like that. I know that I am sensitive to such things, but that seems like an aweful lot of waste when talking about large systems.

Here is one such issue:

image

Let us assume that our state is big, hundreds of MB or maybe a few GB in size.

We currently hold it in memory inside the Snaphsot.STate, then we marshal that to json. Now, I actually had to go and check, but Go’s json package actually does the usual thing and encode a byte array as a base 64 formatted string. What that means, in turn, is that you have an overhead of about 25% that you have to deal with, and this is all allocated in main memory. And then you write it to a file.

This is…. quite insane, to be frank.

Assuming that I have a state that is 100 MB in size, I’m going to hold all of that in memory, then allocate another 125MB just to hold the json state, then write it to a file. Why not write it to a file directly in the first place? (You could do CRC along the way).

The whole thing appear to be assuming small sizes of data. Throughout the entire codebase, actually.

And now, I have no other ways to avoid it, we are going into the server.go itself…

There is a lot of boilerplate stuff there, but the first interesting thing happens when we look at how to apply the log:

image

This says, when we need to apply it, execute the command method on my state. A lot of the other methods are some variant of:

image

Nothing to see here at all.

And we finally get to the key part, the event loop:

image

Let us look in detail on the followerLoop. Inside that function, we have a loop that waits for:

  1. Stop signal, which would lead to us shutting down…
  2. We got an event on our queue…
  3. The timeout for an event has expired…

There is one part there that puzzles me:

image

promotable will return true if the log has any entries at all. I’m not really sure why that is the case, to be honest. In particular, what about the case when we start with an empty server. I’m going to go on reading the code, and we’ll see where it leads us. And it leads us to:

image

So next we need to figure out what is this self join stuff. I am not sure if that is something comes from Raft or from an external source. I found this issue that discusses this, but it isn’t very helpful in terms of understanding who issue the self join command. I tried looking at the etcd codebase, but I didn’t find anything so far. I’ll leave it for now.

The rest of the operations are basically just forwarding the calls to the appropriate methods if they are in the allowed state.

The caniddateLoop method isn’t anything special, it follows the Raft paper pretty nicely, although I have to admit the “candidate becomes follower upon Append Entries command” is buried deep. The same is true for the other behaviors. The appropriate state based responses sometimes are hard to figure out, because you have the state loop, then you have the same apparent behavior everywhere. For example, we need to become follower if we get an Append Entries request. That happens in processAppendEntriesRequest(), but it would actually be easier to see this if we had code duplication. This is a case where getting familiar with the codebase would help understanding it, and I don’t think that this would be a change worth doing, anyway.

Probably the most interesting behavior is in the leadershipLoop when we process a command. A command is added to the server queue using a Do(Command) method. It is then processed in processCommand.

The problem here is that commands are actually appended to the log, and then sent to peers using the heartbeat interval. By default, that stand at 50 ms.

This is great and all, but it does mean that the latency for requests is going to suffer. This doesn’t matter that much for something like etcd. The assumption here is that the requests are all going to be on different things, so we can queue a lot of commands and get pretty good speed overall. It is a problem if in our system, we have to process sequential operations. In that mode, we can’t wait until the heartbeat, and we want to process this right away. I’ll discuss this later,I think. It is a very important property of this implementation (but not for Raft in general).

Looking at the snapshot state, this happens when a follower get this a SnapshotRequest, but I don’t see anywhere that send it. Maybe it is another caller originated thing?

I just looked at the etcd source, and I think that I confirmed that both behaviors are there in the etcd source. So I think that that explains it.

And this is it, basically. I have some thoughts about the implementation of this and of etcd, but I think that this is enough for now… I’ll post them in my next post.

Comments

Kelly Sommers
03/11/2014 02:21 PM by
Kelly Sommers

Sending commands via the heartbeat is a batching optimization mentioned in the Raft paper. The performance of this decision in WAN (cross-data center) scenarios (which is a major use case for consensus algorithms) is pretty significant if you want to have any decent throughput but you also want to combine batching with pipelining.

ZooKeeper's ZAB (ZooKeeper Atomic Broadcast protocol) uses pipelining to reduce latency.

There's a great paper that covers tuning throughput and latency for Paxos which is very applicable to this very topic and discusses performance gains between one and two orders of magnitude.

http://infoscience.epfl.ch/record/165372/files/santos11%20Tuning%20Paxos%20for%20high-throughput%20with%20batching%20and%20pipelining%20-%20TR_2.pdf

Ayende Rahien
03/11/2014 02:27 PM by
Ayende Rahien

Kelly, Sure, that is great for batching, but that only work if you have individual independent actions. If you have a true log, such as all operations are required to be sequential, that is going to be a limiting factor. My interest in this is when you are using this as the mechanism for log shipping in a high availability environment.

dhasenan
03/11/2014 09:25 PM by
dhasenan

Configuration stores typically only store small documents. If you have a configuration document that's 100MB in size, you really need to be looking for more of a distributed filesystem. Google's Chubby, for instance, is a lock service that can be used as a configuration store, and it limits documents to 256KB.

etcd should have a hard-coded limit ideally.

Comments have been closed on this topic.