Ayende @ Rahien

It's a girl

Reviewing go-raft, part I

After going over the etcd codebase, I decided that the raft portion of this is deserving a much stronger look. The project is here, and I am reviewing commit: 30f261bfe873561c2c75b6206ba1f62a42dbc8d6

Again, I strong recommend reading the Raft paper. It is quite good. At any rate, assuming that you understand Raft, let us get cracking. This time, I’m reading this in Sublime Text. As usual, I’m reading in lexicographical order, and I’m starting from append_entires.go

AppendEntries is at the very heart of Raft, so I was pleased to see it here:

// The request sent to a server to append entries to the log.
type AppendEntriesRequest struct {
    Term         uint64
    PrevLogIndex uint64
    PrevLogTerm  uint64
    CommitIndex  uint64
    LeaderName   string
    Entries      []*protobuf.LogEntry
}


// The response returned from a server appending entries to the log.
type AppendEntriesResponse struct {
    pb     *protobuf.AppendEntriesResponse
    peer   string
    append bool
}

However, I didn’t really understand this code. It seemed circular, at least until I realized that we also have a whole lot of generated files. See:

image

The actual protobuf semantics are (excluding a lot of stuff, of course):

message LogEntry {
    required uint64 Index=1;
    required uint64 Term=2;
    required string CommandName=3;
    optional bytes Command=4; // for nop-command
}

message AppendEntriesRequest {
    required uint64 Term=1;
    required uint64 PrevLogIndex=2;
    required uint64 PrevLogTerm=3;
    required uint64 CommitIndex=4;
    required string LeaderName=5;
    repeated LogEntry Entries=6;
}

message AppendEntriesResponse {
    required uint64 Term=1;
    required uint64 Index=2;
    required uint64 CommitIndex=3;
    required bool   Success=4;
}

So, goraft (which I always read as graft) is using protocol buffers as its wire format. Note in particular that the LogEntry contain the full content of a command. That AppendEntriesRequest has an array of them, and that the AppendEntriesResponse is setup separately. That means that it is very natural to use a one way channel for communication. Even though we do request response, there is a high degree of separation between the request & reply. Indeed, from reading the code in etcd, I thought that was the case.

There is something that really bothers me, though. I noticed that in etcd’s codebase as well. This is things like this:

// Encodes the AppendEntriesRequest to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (req *AppendEntriesRequest) Encode(w io.Writer) (int, error) {
    pb := &protobuf.AppendEntriesRequest{
        Term:         proto.Uint64(req.Term),
        PrevLogIndex: proto.Uint64(req.PrevLogIndex),
        PrevLogTerm:  proto.Uint64(req.PrevLogTerm),
        CommitIndex:  proto.Uint64(req.CommitIndex),
        LeaderName:   proto.String(req.LeaderName),
        Entries:      req.Entries,
    }

    p, err := proto.Marshal(pb)
    if err != nil {
        return -1, err
    }

    return w.Write(p)
}

I’m not sure about the actual semantics of memory allocations in Go, but let us assume that we had a single ,log entry with 1KB for the command data. This means that we would have the command data:

  • Once in the LogEntry inside the AppendEntriesRequest
  • Once in the protocol buffers byte array returned from Marshal

There doesn’t appear to be any way to directly stream things. Maybe it is usually dealing with small amounts of data, maybe they didn’t notice, or maybe something in Go make this very efficient, but I doubt it.

The next interesting part is Command handling. Raft is all about reaching a consensus on the order of executing a set of commands in a cluster. So it is really interesting to see it being handled with Go’s interfaces.

// Command represents an action to be taken on the replicated state machine.
type Command interface {
    CommandName() string
}

// CommandApply represents the interface to apply a command to the server.
type CommandApply interface {
    Apply(Context) (interface{}, error)
}

type CommandEncoder interface {
    Encode(w io.Writer) error
    Decode(r io.Reader) error
}

We have some additional things about serializing commands and reading them back, but nothing beyond this. The Commands.go file, however, is of a little bit more interest. Let us look at the join command:

// Join command interface
type JoinCommand interface {
    Command
    NodeName() string
}

// Join command
type DefaultJoinCommand struct {
    Name             string `json:"name"`
    ConnectionString string `json:"connectionString"`
}


// The name of the Join command in the log
func (c *DefaultJoinCommand) CommandName() string {
    return "raft:join"
}

func (c *DefaultJoinCommand) Apply(server Server) (interface{}, error) {
    err := server.AddPeer(c.Name, c.ConnectionString)

    return []byte("join"), err
}

func (c *DefaultJoinCommand) NodeName() string {
    return c.Name
}

I’m not sure when we have an interface for JoinCommand, then a default implementation like that. I saw that elsewhere in etcd, it might be a Go pattern. Note that the JoinCommand is an interface that embeds another interface (Command, in this case, obviously).

Note that you have the Apply function to actually handle the real work, in this case, add a peer.  There is nothing interesting in config.go, debug.go or context.go but event.go is puzzling. To be fair, I am really at a loss to explain this style:

// Event represents an action that occurred within the Raft library.
// Listeners can subscribe to event types by using the Server.AddEventListener() function.
type Event interface {
    Type() string
    Source() interface{}
    Value() interface{}
    PrevValue() interface{}
}

// event is the concrete implementation of the Event interface.
type event struct {
    typ       string
    source    interface{}
    value     interface{}
    prevValue interface{}
}

// newEvent creates a new event.
func newEvent(typ string, value interface{}, prevValue interface{}) *event {
    return &event{
        typ:       typ,
        value:     value,
        prevValue: prevValue,
    }
}

// Type returns the type of event that occurred.
func (e *event) Type() string {
    return e.typ
}

// Source returns the object that dispatched the event.
func (e *event) Source() interface{} {
    return e.source
}

// Value returns the current value associated with the event, if applicable.
func (e *event) Value() interface{} {
    return e.value
}

// PrevValue returns the previous value associated with the event, if applicable.
func (e *event) PrevValue() interface{} {
    return e.prevValue
}

Why go to all this trouble to define things this way? It seems like a lot of boiler plate code. It would be easier to just expose a struct directly. I am assuming that this is done so you can send other things than the event struct, with additional information as well. In C# you’ll do that by subsclassing the event, but you cannot do that in Go. A better alternative might have been to just have a tag / state field in the struct and let it go that way, though.

event_dispatcher.go is just an implementation of a dictionary of string to events, nothing much beyond that. A lot of boiler plate code, too.

http_transporter.go is next, and is a blow to my hope that this will do a one way messaging system. I’m thinking about doing Raft over ZeroMQ or NanoMSG. Here is the actual process of sending data over the wire:

// Sends an AppendEntries RPC to a peer.
func (t *HTTPTransporter) SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
    var b bytes.Buffer
    if _, err := req.Encode(&b); err != nil {
        traceln("transporter.ae.encoding.error:", err)
        return nil
    }

    url := joinPath(peer.ConnectionString, t.AppendEntriesPath())
    traceln(server.Name(), "POST", url)

    t.Transport.ResponseHeaderTimeout = server.ElectionTimeout()
    httpResp, err := t.httpClient.Post(url, "application/protobuf", &b)
    if httpResp == nil || err != nil {
        traceln("transporter.ae.response.error:", err)
        return nil
    }
    defer httpResp.Body.Close()

    resp := &AppendEntriesResponse{}
    if _, err = resp.Decode(httpResp.Body); err != nil && err != io.EOF {
        traceln("transporter.ae.decoding.error:", err)
        return nil
    }

    return resp
}

This is very familiar territory for me, I have to say Smile. Although, again, there is a lot of wasted memory here by encoding the data multiple times, instead of streaming it directly.

And here is how it receives information:

// Handles incoming AppendEntries requests.
func (t *HTTPTransporter) appendEntriesHandler(server Server) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        traceln(server.Name(), "RECV /appendEntries")

        req := &AppendEntriesRequest{}
        if _, err := req.Decode(r.Body); err != nil {
            http.Error(w, "", http.StatusBadRequest)
            return
        }

        resp := server.AppendEntries(req)
        if _, err := resp.Encode(w); err != nil {
            http.Error(w, "", http.StatusInternalServerError)
            return
        }
    }
}

Really there is nothing much to write home about, to be frank. All of the operations are like that, just encoding/decoding and forwarding the code to the right function. I’m skipping log.go in favor of going to log_entry.go for a moment. The log is really important in Raft, so I want to focus on small chewables first.

If the user don’t provide an encoder for a command, it will be converted using json, then serialized to a writer using protocol buffers format.

One thing that I did notice that was interesting was a bug in decoding from a ptorocol buffer stream:

// Decodes the log entry from a buffer. Returns the number of bytes read and
// any error that occurs.
func (e *LogEntry) Decode(r io.Reader) (int, error) {

    var length int
    _, err := fmt.Fscanf(r, "%8x\n", &length)
    if err != nil {
        return -1, err
    }

    data := make([]byte, length)
    _, err = r.Read(data)

    if err != nil {
        return -1, err
    }

    if err = proto.Unmarshal(data, e.pb); err != nil {
        return -1, err
    }

    return length + 8 + 1, nil
}

Do you see the bug?

It is in the reading of the data from the reader. A reader may decide to read less than the data that was requested. In this case, I’m assuming that it is always sending fully materialized readers to the Decode method, not surprising given how often it will create in memory buffers for the entire dataset. Still.. that isn’t nice to do, and it can create the most subtle and hard to understand bugs.

And now, into the Log!

// A log is a collection of log entries that are persisted to durable storage.
type Log struct {
    ApplyFunc   func(*LogEntry, Command) (interface{}, error)
    file        *os.File
    path        string
    entries     []*LogEntry
    commitIndex uint64
    mutex       sync.RWMutex
    startIndex  uint64 // the index before the first entry in the Log entries
    startTerm   uint64
}

// The results of the applying a log entry.
type logResult struct {
    returnValue interface{}
    err         error
}

There are a few things that we can notice right now. First, ApplyFunc is how we control the application of stuff to the in memory state, I am assuming. Given that applying the log can only happen after we have a consensus and probably fsynced to disk, it makes sense to invoke it from here.

Then, we also have a file, so that is where we are actually doing a lot of the interesting stuff, like actual storage IO and things like that. The in memory events array is also interesting, mostly because I wonder just how big it is, and when it is getting truncated. I think that the way it works, we have the log properties, which likely represent the flushed to disk state, and the entries represent the yet to be flushed state.

Things get interesting in the open method, which is called to create new log or recover an existing one. The interesting parts (recovery) is here:

// Read the file and decode entries.
for {
    // Instantiate log entry and decode into it.
    entry, _ := newLogEntry(l, nil, 0, 0, nil)
    entry.Position, _ = l.file.Seek(0, os.SEEK_CUR)

    n, err := entry.Decode(l.file)
    if err != nil {
        if err == io.EOF {
            debugln("open.log.append: finish ")
        } else {
            if err = os.Truncate(path, readBytes); err != nil {
                return fmt.Errorf("raft.Log: Unable to recover: %v", err)
            }
        }
        break
    }
    if entry.Index() > l.startIndex {
        // Append entry.
        l.entries = append(l.entries, entry)
        if entry.Index() <= l.commitIndex {
            command, err := newCommand(entry.CommandName(), entry.Command())
            if err != nil {
                continue
            }
            l.ApplyFunc(entry, command)
        }
        debugln("open.log.append log index ", entry.Index())
    }

    readBytes += int64(n)
}

This is really interesting, because it is actually sending the raw file to the Decode function, unlike what I expected. The reason this is surprising is that there is a strong likelihood that the OS  will actually return less data than requested. As it turns out, on Windows, this will never be the case, but it does appear (at least from the contract of the API it ends up calling) that at least on Linux, that is possible. Now, I ended up going all the way to the sys call interface in linux, so I’m pretty sure that this can’t happen there either, but still…

At any rate, the code appear to be pretty clear. We read the log entry, decode it, (truncating the file if there are any issues) and if need be, we apply it.

And I think that this is enough for now… it is close to 9 PM, and I need to do other things as well. I’ll get back to this in my next post.

Comments

Ben Johnson
03/12/2014 03:28 AM by
Ben Johnson

Thanks for doing a such a thorough review of the code, Ayende. They're always really interesting reads. I wanted to add some comments for clarification.

  1. You hit the nail on the head with the memory efficiency issues. Raft has primarily been used for configuration management and relatively small data sizes so it hasn't been a huge issue so far. However, I actually need it for large amounts of data in the near future so I'm currently working on a successor to the library. I had a lot of success using zero-copy mmaps when I did a port of LMDB to Go recently so I'll probably use the same for go-raft 2.0. I'm also going to reuse buffers for reading off the socket and reference them directly.

  2. The protobuf generated code is checked in so that the library can be used via "go get". It can certainly look a little strange if you're doing a code review though. We haven't seen almost any changes in the protocol so I'll probably forgo protobufs and use a binary protocol for performance in go-raft 2.

  3. The JoinCommand/LeaveCommand interfaces were previously used as marker interfaces since they can require some special handling to ensure that leaving nodes actually receive the command. That code was removed at one point though so these interfaces should probably be removed as well.

  4. The aim of the Event interface was to make the fields read-only to the application using the library. It's probably overkill and a simpler struct would have been fine.

  5. The Raft paper (or at least as it existed in April 2013) was geared around request/response. If I remember correctly, Nicolas Trangez, who was working on a Haskell implementation, was discussing one way UDP-style communication but there were some ambiguities so I went with a simple request/response approach. I'd like to revisit this in the next version and provide a UDP or ZMQ option.

Thanks again for submitting the partial read bug to Github. It's fixed up now.

Ayende Rahien
03/13/2014 12:59 PM by
Ayende Rahien

Ben,

1) It isn't so much for zero copy, it is that you are moving the data around so much, first from the user's data, then to json, then to protobuf, etc. And all of that is always happening on the full objects. I think a streaming approach is probably much better.

5) I'm sorry, but the code as written assumes a disconnected style. The actual transport implementation may be req/rep, but nothing in the actual code for Raft assumes that.

Comments have been closed on this topic.