Ayende @ Rahien

Refunds available at head office

Reviewing etcd

The etcd project is a project that I stumbled upon that looks interesting. It is a a highly-available key value store for shared configuration and service discovery. It is written in Go and is implemented using Raft. I’m reviewing commit  46d817f91b2edf4141081abff7d92a4f71d39248.

I don’t know Go, and I think that this would be a great way to learn both about Raft (which I am very interested about) and about Go (which I peeked at occasionally, but never really studied). Like some of my other posts, this is likely to be a very long and rambling one. For reading the code, I am using LiteIDE, at least for now.

This is what this looks like.


I usually like to do a lexicographical read through the codebase, at least at first. That means that in this case, I have to go through the docs first. Probably not a totally bad idea, but a divergence from my usual approach.

Discovery – it looks like etcd handled the problem of initial peers selection by… going to another etcd cluster, that handle membership information. There is a SaaS offering, it appears (discovery.etcd.io). I like the recursive nature of that, and obviously you can set it up with a static list of peers to start with.

The first real code file I saw was this one, bench.go:

   1: package main
   3: import (
   4:     "flag"
   5:     "log"
   6:     "strconv"
   8:     "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
   9: )
  11: func write(endpoint string, requests int, end chan int) {
  12:     client := etcd.NewClient([]string{endpoint})
  14:     for i := 0; i < requests; i++ {
  15:         key := strconv.Itoa(i)
  16:         _, err := client.Set(key, key, 0)
  17:         if err != nil {
  18:             println(err.Error())
  19:         }
  20:     }
  21:     end <- 1
  22: }
  24: func watch(endpoint string, key string) {
  25:     client := etcd.NewClient([]string{endpoint})
  27:     receiver := make(chan *etcd.Response)
  28:     go client.Watch(key, 0, true, receiver, nil)
  30:     log.Printf("watching: %s", key)
  32:     received := 0
  33:     for {
  34:         <-receiver
  35:         received++
  36:     }
  37: }
  39: func main() {
  40:     endpoint := flag.String("endpoint", "", "etcd HTTP endpoint")
  42:     rWrites := flag.Int("write-requests", 50000, "number of writes")
  43:     cWrites := flag.Int("concurrent-writes", 500, "number of concurrent writes")
  45:     watches := flag.Int("watches", 500, "number of writes")
  47:     flag.Parse()
  49:     for i := 0; i < *watches; i++ {
  50:         key := strconv.Itoa(i)
  51:         go watch(*endpoint, key)
  52:     }
  54:     wChan := make(chan int, *cWrites)
  55:     for i := 0; i < *cWrites; i++ {
  56:         go write(*endpoint, (*rWrites / *cWrites), wChan)
  57:     }
  59:     for i := 0; i < *cWrites; i++ {
  60:         <-wChan
  61:         log.Printf("Completed %d writes", (*rWrites / *cWrites))
  62:     }
  63: }

I include the entire file here because it is short, and really quite interesting. This is my first time really reading Go code, so I had to go and read some docs. The first interesting thing is in line 11, when when have “end chan int”, which defines a channel of integers. This allows cross goroutine (for .NET guys, think tasks / TPL, that isn’t actually accurate, but it is close enough) communication, including “waiting” for results.

The write func will write the specified number of requests, then push a number to the channel, signifying that it completed its work. That is really quite nice pattern for doing work, considering that there isn’t a way to await a goroutine.

The watch func is a little hard for me to get. Mostly because as far as I understand, it is setting up a watch for a particular key, then just accumulate the number of changes to it in a local variable, and not doing anything else with it.

The main function is really funny to read. The flag package is fascinating way to handle parameter parsing, and it shows how carefully Go was meant to be a server side language, where command line parsing is really common. The flag package is both powerful and really simple. I think that I’ll probably make use of this approach for configuration in RavenDB.

I love that you define the flag, and then you get a pointer to where that value will be, once you called flag.Parse();

Note that calls to go [expr] are equivalent for Task.Factory.StartNew([expr]) in .NET (not really, but close enough). The syntax <-wChan, for example, means “wait until there is a new value in the channel”. Presumably it also translate to something like “await channel.DequeueAsync()” in C#.

Next was the config package, where etcd is initializing itself. It was interesting to learn that you can have non global flags using the flag package, so you can use the same code for parsing arguments to a method. But that was about it. Nothing exciting there.

I’m skipping the contrib directory because there is no Go code there, and it doesn’t seems relevant for now. I’ll note that there is a mix of shell scripts, Python and json code in there, so I’m feeling good about ignoring that for now.

One thing that I really like in Go is that it is very easy to define “extension methods”, in fact, you usually appear to define structs (data holders), and then you just define a function that takes this as the base argument, and you can call it using method syntax. That makes some things very natural and nice. And it also give you really nice separation between data & behavior.

I also like the multiple return values option, which gives us a good pattern for reporting errors without getting either crazy syntax or throwing. It also make it clear when we want to ignore errors. Look at this:

   1: func (d *Discoverer) findPeers() (peers []string, err error) {
   2:     resp, err := d.client.Get(path.Join(d.prefix), false, true)
   3:     if err != nil {
   4:         return nil, err
   5:     }
   7:     node := resp.Node
   9:     if node == nil {
  10:         return nil, fmt.Errorf("%s key doesn't exist.", d.prefix)
  11:     }

Trying to do this in C, for example, would lead to an explosion of arrow head return values, or the complexities of “return zero for error, then get the actual issue from GetLatsError()”. Multiple return values results in a much nicer code than that.

The discovery protocol itself is defined in the docs ,but the code implementing it is really nice. Being able to piggy back on etcd itself to implement discovery is really nice.

The fixtures directory seems to be filled with certs files, I am not sure what for, but I’ll go directly to the http directory and see what is going on there. And there doesn’t appear to be anything much, so I’m moving on to the next thing that is actually meaningful.

The metrics section is interesting, mostly because we have been doing the same thing in RavenDB currently. But it all depends on an external package, so I’ll skip this for now. The next interesting thing is in the mod folder, where we have the dashboard (html5 app, not interested for me) module, and the leader & lock modules.

I’ll start with the leader module. Where we actually have interesting things. The leader module is actually very literally just proxying stuff to the lock module. It is getting a request, translating that request to a lock module http request, and execute that. Personally, I wouldn’t bother with doing this server side, and handle this entirely client side, or by calling the lock module methods directly, instead of proxying the request to the lock module. I am not sure why this approach was choosen:

   1: // getHandler retrieves the current leader.
   2: func (h *handler) getHandler(w http.ResponseWriter, req *http.Request) error {
   3:     vars := mux.Vars(req)
   5:     // Proxy the request to the lock service.
   6:     url := fmt.Sprintf("%s/mod/v2/lock/%s?field=value", h.addr, vars["key"])
   7:     resp, err := h.client.Get(url)
   8:     if err != nil {
   9:         return err
  10:     }
  11:     defer resp.Body.Close()
  13:     w.WriteHeader(resp.StatusCode)
  14:     io.Copy(w, resp.Body)
  15:     return nil
  16: }

The lock module is where real stuff is happening. And at the same time, I am not sure at what level exactly this is happening. What appears to be happening is that the lock module, too, is built directly on top of the etcd client, rather than using it directly. This is strange to me, because that isn’t the way I would architect it, but I am guessing that this make it easier to work with things, having only a single real external API. On the other hand, having a server make http requests to itself seems very strange to me.

One thing that really confused me was a lot of references to things that are actually defined in another repository, the client side of etcd in go. Another interesting thing is the way Go implements interfaces. Instead of using explicit interfaces, if a type has all the methods for an interface, it is implementing that interface.

At this point I decided that I wanted a better IDE and spent some time getting IntelliJ to work with Go. It supports this, and you even get some reference tracking. I couldn’t get all of it to work, in particular, external reference weren’t tracked, and I didn’t really care to see why, so I just left it:


At any rate, I was reading the lock module code. In particular, I am not tracking the acquire_handler.go file. It has a major function (acquireHandler)* that actually handle the process of acquiring the lock.

* Sidenote, I like the structure of the code so far, in most files, we have one function, and some supporting functions to help it do some work. It is nice, simple and quite easy to follow.

The first thing that is done is syncing the cluster information. This is done by going to any of the machines that we already know about and asking them about the current state of the cluster. We take the first response, and presumably, since we are running server side, the first response would always be from us (assuming that the requests end up going to the leader). So there isn’t another machine boundary request, but it is still very strange to read it going through so much client operations.

This code is really interesting:

   2:     // Setup connection watcher.
   3:     closeNotifier, _ := w.(http.CloseNotifier)
   4:     closeChan := closeNotifier.CloseNotify()
   5:     stopChan := make(chan bool)

In C#, that would be setting up a cancellation token for the request being abandoned by the client or we completing some work.

   1: // If node exists then just watch it. Otherwise create the node and watch it.           
   2: node, index, pos := h.findExistingNode(keypath, value)                                  
   3: if index > 0 {                                                                          
   4:     if pos == 0 {                                                                          
   5:         // If lock is already acquired then update the TTL.                                   
   6:         h.client.Update(node.Key, node.Value, uint64(ttl))                                    
   7:     } else {                                                                               
   8:         // Otherwise watch until it becomes acquired (or errors).                             
   9:         err = h.watch(keypath, index, nil)                                                    
  10:     }                                                                                      
  11: } else {                                                                                
  12:     index, err = h.createNode(keypath, value, ttl, closeChan, stopChan)                    
  13: }                                                                                       

This is interesting, I am not really able to follow what is going on in the happen case (index > 0) yet. Let us lock at what happens with createNode…

   1: // createNode creates a new lock node and watches it until it is acquired or acquisition fails.
   2: func (h *handler) createNode(keypath string, value string, ttl int, closeChan <-chan bool, stopChan chan bool) (int, error) {
   3:     // Default the value to "-" if it is blank.
   4:     if len(value) == 0 {
   5:         value = "-"
   6:     }
   8:     // Create an incrementing id for the lock.
   9:     resp, err := h.client.AddChild(keypath, value, uint64(ttl))
  10:     if err != nil {
  11:         return 0, err
  12:     }
  13:     indexpath := resp.Node.Key
  14:     index, _ := strconv.Atoi(path.Base(indexpath))
  16:     // Keep updating TTL to make sure lock request is not expired before acquisition.
  17:     go h.ttlKeepAlive(indexpath, value, ttl, stopChan)
  19:     // Watch until we acquire or fail.
  20:     err = h.watch(keypath, index, closeChan)
  22:     // Check for connection disconnect before we write the lock index.
  23:     if err != nil {
  24:         select {
  25:         case <-closeChan:
  26:             err = errors.New("user interrupted")
  27:         default:
  28:         }
  29:     }
  31:     // Update TTL one last time if acquired. Otherwise delete.
  32:     if err == nil {
  33:         h.client.Update(indexpath, value, uint64(ttl))
  34:     } else {
  35:         h.client.Delete(indexpath, false)
  36:     }
  38:     return index, err
  39: }

In line 9, we create a child of the key path. Assuming that the key path is foo, this will create an item with foo/1, foo/2, etc. Effectively an auto incrementing number (with no guarantees on the size of the step, mind).

In line 17 we make sure that we keep this alive, the ttlKeepAlive function is really fun to read:

   1: // ttlKeepAlive continues to update a key's TTL until the stop channel is closed.
   2: func (h *handler) ttlKeepAlive(k string, value string, ttl int, stopChan chan bool) {
   3:     for {
   4:         select {
   5:         case <-time.After(time.Duration(ttl/2) * time.Second):
   6:             h.client.Update(k, value, uint64(ttl))
   7:         case <-stopChan:
   8:             return
   9:         }
  10:     }
  11: }

The C# translation for this would be:

   1: public async Task TtlKeepAlive(string k, string value, int ttl, CancelationToken t)
   2: {
   3:     while(true)
   4:     {
   5:         await Task.Delay(ttl, t);
   6:         if(t.IsCancelled)
   7:           return;
   8:         client.Update(k,value, ttl);
  10:     }
  11: }

But I really like this Go select syntax. It is very much like Erlang’s pattern matching on receive. At any rate, it seems that the magic happens in the watch method.

   1: // watch continuously waits for a given lock index to be acquired or until lock fails.
   2: // Returns a boolean indicating success.
   3: func (h *handler) watch(keypath string, index int, closeChan <-chan bool) error {
   4:     // Wrap close chan so we can pass it to Client.Watch().
   5:     stopWatchChan := make(chan bool)
   6:     stopWrapChan := make(chan bool)
   7:     go func() {
   8:         select {
   9:         case <-closeChan:
  10:             stopWatchChan <- true
  11:         case <- stopWrapChan:
  12:             stopWatchChan <- true
  13:         case <- stopWatchChan:
  14:         }
  15:     }()
  16:     defer close(stopWrapChan)
  18:     for {
  19:         // Read all nodes for the lock.
  20:         resp, err := h.client.Get(keypath, true, true)
  21:         if err != nil {
  22:             return fmt.Errorf("lock watch lookup error: %s", err.Error())
  23:         }
  24:         nodes := lockNodes{resp.Node.Nodes}
  25:         prevIndex := nodes.PrevIndex(index)
  27:         // If there is no previous index then we have the lock.
  28:         if prevIndex == 0 {
  29:             return nil
  30:         }
  32:         // Watch previous index until it's gone.
  33:         waitIndex := resp.Node.ModifiedIndex
  35:         // Since event store has only 1000 histories we should use first node's CreatedIndex if available
  36:         if firstNode := nodes.First(); firstNode != nil {
  37:             waitIndex = firstNode.CreatedIndex
  38:         }
  40:         _, err = h.client.Watch(path.Join(keypath, strconv.Itoa(prevIndex)), waitIndex, false, nil, stopWatchChan)
  41:         if err == etcd.ErrWatchStoppedByUser {
  42:             return fmt.Errorf("lock watch closed")
  43:         } else if err != nil {
  44:             return fmt.Errorf("lock watch error: %s", err.Error())
  45:         }
  46:     }
  47: }

I’ve to admit, this makes my head hurt, just a little bit.

But first, the defer syntax Go has is really nice. It is very similar to C#’s using statements, but it isn’t limited to just a specific interface, and it doesn’t introduce a nesting block.

To go routine in line 7 is interesting. It will wait for a notification from the close channel, the stop watch channel or the wrap channel. And it will forward all of those to the stop watch channel. I’m really not sure why this is the case, but let’s go with this for now.

The real interesting work happens in the for loop. We get all the keys in the specified key path. Note that we assume that we only have numeric keys in that “directory”. And we basically try to find if there is any value that is before our value.

The easiest way to think about it is in the same way you do when you wait in line in any government queue. You take a number, and you’re the first is there is no one with an earlier number than you.

The interesting bit is how Watch is handled. It is basically going to do a long poll request from the server, and the stopWatchChan is used to notify the Watch method when the user cancelled the request, so we don’t need this any longer. I’m really not sure why there is a need for stopWrapChan, but… at least now I understand what is going on here. We use the numbering system to effectively join a queue. Then we wait until we are at the head of the queue.

Let us go back to the actual acuireHandler routine, more specifically to the findExistingNode() behavior. If we specified a value, we try to find an existing entry in the path that already have this value. If there isn’t a value, we go back to the “take a number, wait” approach. If there is a value, however, I don’t following the logic. The findExistingNode() has three return values. The relevant node with the value, the index (the queue #, effectively), and the position of the specified node in the queue.

My problem is that I don’t understand the logic here. We find a node with the same value as we want, then we check that it is the first in the queue? What happens when we have two clients issuing the same request at the same time? I understand what happens, I don’t understand what the intention is.

As an aside, I think that I understand why a lot of the internal works in the lock module is done over the HTTP layer. The idea here is to only handle the distribution once. And if you route everything through the http interface, that would be it. This way, you don’t have to worry about how to handle consistencies, or stuff like that. And the idea is that you have a simple HTTP interface for a complex system like locking. My own preference would be to do this entirely client side, with no server side behavior, but that puts a lot of the onus on the clients, and it is easier to implement server side if you have a lot of clients for many environments.

Anyway, I think that I found the two pieces that really interests me:



Store is probably the on disk storage, something that is very near & dear to my heart. While server is the pieces that I’ll probably learn the most from…

I’ll start with going over the storage stuff, since that is probably the most familiar to me. Here is the interface for the store:

   1: type Store interface {
   2:     Version() int
   3:     CommandFactory() CommandFactory
   4:     Index() uint64
   6:     Get(nodePath string, recursive, sorted bool) (*Event, error)
   7:     Set(nodePath string, dir bool, value string, expireTime time.Time) (*Event, error)
   8:     Update(nodePath string, newValue string, expireTime time.Time) (*Event, error)
   9:     Create(nodePath string, dir bool, value string, unique bool,
  10:         expireTime time.Time) (*Event, error)
  11:     CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
  12:         value string, expireTime time.Time) (*Event, error)
  13:     Delete(nodePath string, recursive, dir bool) (*Event, error)
  14:     CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error)
  16:     Watch(prefix string, recursive, stream bool, sinceIndex uint64) (*Watcher, error)
  18:     Save() ([]byte, error)
  19:     Recovery(state []byte) error
  21:     TotalTransactions() uint64
  22:     JsonStats() []byte
  23:     DeleteExpiredKeys(cutoff time.Time)
  24: }

This is really interesting, because from the interface alone you can see some really interesting things. For example, The Save() method. That doesn’t match any transactional store interface that I can think of. To be fair, it looks to be more in the sense that this is used for snapshots than anything else, but still..

Those appear to be the core elements of the store:

   1: type store struct {                                                     
   2:     Root           *node                                                   
   3:     WatcherHub     *watcherHub                                             
   4:     CurrentIndex   uint64                                                  
   5:     Stats          *Stats                                                  
   6:     CurrentVersion int                                                     
   7:     ttlKeyHeap     *ttlKeyHeap  // need to recovery manually               
   8:     worldLock      sync.RWMutex // stop the world lock                     
   9: }                                                                       
  12: // node is the basic element in the store system.
  13: // A key-value pair will have a string value
  14: // A directory will have a children map
  15: type node struct {
  16:     Path string
  18:     CreatedIndex  uint64
  19:     ModifiedIndex uint64
  21:     Parent *node `json:"-"` // should not encode this field! avoid circular dependency.
  23:     ExpireTime time.Time
  24:     ACL        string
  25:     Value      string           // for key-value pair
  26:     Children   map[string]*node // for directory
  28:     // A reference to the store this node is attached to.
  29:     store *store
  30: }

Again, the ability to return multiple values is really nice, see methods such as:

   1: // Read function gets the value of the node.
   2: // If the receiver node is not a key-value pair, a "Not A File" error will be returned.
   3: func (n *node) Read() (string, *etcdErr.Error) {
   4:     if n.IsDir() {
   5:         return "", etcdErr.NewError(etcdErr.EcodeNotFile, "", n.store.Index())
   6:     }
   8:     return n.Value, nil
   9: }

This is quite lovely way to handle things.

However, looking at the Store directory, I am seeing a lot of stuff about modifying the in memory state, but nothing about persistence. I think that this is all handled via Raft. So I’ll just move away into reading the server side code now.

This is the Server interface:

   1: type Server interface {
   2:     State() string
   3:     Leader() string
   4:     CommitIndex() uint64
   5:     Term() uint64
   6:     PeerURL(string) (string, bool)
   7:     ClientURL(string) (string, bool)
   8:     Store() store.Store
   9:     Dispatch(raft.Command, http.ResponseWriter, *http.Request) error
  10: }

And then we have actually handling requests. I choose to look at the simplest thing, just looking at how we process a read request:

   1: func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
   2:     vars := mux.Vars(req)
   3:     key := "/" + vars["key"]
   5:     // Help client to redirect the request to the current leader
   6:     if req.FormValue("consistent") == "true" && s.State() != raft.Leader {
   7:         leader := s.Leader()
   8:         hostname, _ := s.ClientURL(leader)
  10:         url, err := url.Parse(hostname)
  11:         if err != nil {
  12:             log.Warn("Redirect cannot parse hostName ", hostname)
  13:             return err
  14:         }
  15:         url.RawQuery = req.URL.RawQuery
  16:         url.Path = req.URL.Path
  18:         log.Debugf("Redirect consistent get to %s", url.String())
  19:         http.Redirect(w, req, url.String(), http.StatusTemporaryRedirect)
  20:         return nil
  21:     }
  23:     recursive := (req.FormValue("recursive") == "true")
  24:     sort := (req.FormValue("sorted") == "true")
  25:     waitIndex := req.FormValue("waitIndex")
  26:     stream := (req.FormValue("stream") == "true")
  28:     if req.FormValue("wait") == "true" {
  29:         return handleWatch(key, recursive, stream, waitIndex, w, s)
  30:     }
  32:     return handleGet(key, recursive, sort, w, s)
  33: }

If we are requiring consistency, and we aren’t the leader, we’ll forward to the leader. Otherwise, we’ll process the request. Let us assume for now that we are doing a simple get, not a watch, this gives us:

   1: func handleGet(key string, recursive, sort bool, w http.ResponseWriter, s Server) error {
   2:     event, err := s.Store().Get(key, recursive, sort)
   3:     if err != nil {
   4:         return err
   5:     }
   7:     writeHeaders(w, s)
   8:     b, _ := json.Marshal(event)
   9:     w.Write(b)
  10:     return nil
  11: }
  13: func writeHeaders(w http.ResponseWriter, s Server) {
  14:     w.Header().Set("Content-Type", "application/json")
  15:     w.Header().Add("X-Etcd-Index", fmt.Sprint(s.Store().Index()))
  16:     w.Header().Add("X-Raft-Index", fmt.Sprint(s.CommitIndex()))
  17:     w.Header().Add("X-Raft-Term", fmt.Sprint(s.Term()))
  18:     w.WriteHeader(http.StatusOK)
  19: }

As you can see, we are basically just getting the current state from the in memory store, and handle that. It would be more interesting to look at how we handle waiting for a value to change, however:

   1: func handleWatch(key string, recursive, stream bool, waitIndex string, w http.ResponseWriter, s Server) error {
   2:     // Create a command to watch from a given index (default 0).
   3:     var sinceIndex uint64 = 0
   4:     var err error
   6:     if waitIndex != "" {
   7:         sinceIndex, err = strconv.ParseUint(waitIndex, 10, 64)
   8:         if err != nil {
   9:             return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", s.Store().Index())
  10:         }
  11:     }
  13:     watcher, err := s.Store().Watch(key, recursive, stream, sinceIndex)
  14:     if err != nil {
  15:         return err
  16:     }
  18:     cn, _ := w.(http.CloseNotifier)
  19:     closeChan := cn.CloseNotify()
  21:     writeHeaders(w, s)
  23:     if stream {
  24:         // watcher hub will not help to remove stream watcher
  25:         // so we need to remove here
  26:         defer watcher.Remove()
  27:         for {
  28:             select {
  29:             case <-closeChan:
  30:                 return nil
  31:             case event, ok := <-watcher.EventChan:
  32:                 if !ok {
  33:                     // If the channel is closed this may be an indication of
  34:                     // that notifications are much more than we are able to
  35:                     // send to the client in time. Then we simply end streaming.
  36:                     return nil
  37:                 }
  39:                 b, _ := json.Marshal(event)
  40:                 _, err := w.Write(b)
  41:                 if err != nil {
  42:                     return nil
  43:                 }
  44:                 w.(http.Flusher).Flush()
  45:             }
  46:         }
  47:     }
  49:     select {
  50:     case <-closeChan:
  51:         watcher.Remove()
  52:     case event := <-watcher.EventChan:
  53:         b, _ := json.Marshal(event)
  54:         w.Write(b)
  55:     }
  56:     return nil
  57: }

The store’s Watch() method is actually interesting, because it exposes some interesting Go concepts (handling full channels, channels for communications, etc). But the important thing is that this will simply wait for a change to happen in the in memory state, and when such a thing happens, it will put a value in the wathcer.EventChan channel. So the logic here goes like this:

  • Setup a watch on the in memory state.
  • Wait for a:
    • Change in the items we watch
    • Or user abandoning the request

There is some interesting stuff here regarding one time watch, or streaming watch, but that appears to be quite easy to figure out and follow what is going on.

One thing that I can tell you, from my own experience, is that I would actually expect this to have serious issues in productions. In particular, web servers can decide that this request takes too long, and just abort it (for example IIS behaves in this manner), or that it timed out. That obviously depends on the server side implementation, and I’m willing to assume that this isn’t the case for whatever http stack etcd uses. However, clients do not. Most clients would give you the benefit of the doubt, but they would abort the request after a while, usually 15 seconds. That might be okay for some purposes, but especially if you want to handling streaming, that isn’t really going to cut it.

More to the point, for long requests, this can cause issues for proxies, firewalls, etc. They’ll decide that the request is closed, and shut it down even if you handled it on both ends properly. With RavenDB, we have a remarkably similar system, but our streaming notifications also incorporate the idea of heartbeat messages. Those are sent every now and then strictly in order to make sure that you’ll get something client side, and that will make all the infrastructure, client side code, etc much much happier.

Enough with the small stuff, let us look at how we handle more complex things. I now intend to take a look at the POST handler. POST operations in etcd has the following format:

curl -XPOST -d value=Job1

The idea is that this will create an automatically named key such as queue/15, queue/853, etc. The POST handler is interesting, because here it is in its entirety.

   2: func PostHandler(w http.ResponseWriter, req *http.Request, s Server) error {
   3:     vars := mux.Vars(req)
   4:     key := "/" + vars["key"]
   6:     value := req.FormValue("value")
   7:     dir := (req.FormValue("dir") == "true")
   8:     expireTime, err := store.TTL(req.FormValue("ttl"))
   9:     if err != nil {
  10:         return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", s.Store().Index())
  11:     }
  13:     c := s.Store().CommandFactory().CreateCreateCommand(key, dir, value, expireTime, true)
  14:     return s.Dispatch(c, w, req)
  15: }

The CreateCreateCommand just create a data object that holds the parameters, and then we dispatch it. I’m thinking that we can learn quite a lot from how that works.

Dispatch merely send the command to the leader. This is the relevant code if we are not the leader:

   1: leader := ps.raftServer.Leader()                                         
   3: // No leader available.                                                  
   4: if leader == "" {                                                        
   5:     return etcdErr.NewError(300, "", s.Store().Index())                     
   6: }                                                                        
   8: var url string                                                           
   9: switch c.(type) {                                                        
  10: case *JoinCommand, *RemoveCommand:                                       
  11:     url, _ = ps.registry.PeerURL(leader)                                    
  12: default:                                                                 
  13:     url, _ = ps.registry.ClientURL(leader)                                  
  14: }                                                                        
  15: uhttp.Redirect(url, w, req)                                              
  17: return nil                                                               

So basically, if there isn’t a leader, we error. That can happen if we have a network split and we are in the minority portion, for example. But usually we’ll just redirect you to the right server to use. But here is the interesting part, where we are the leader, and get to do stuff:

   1: result, err := ps.raftServer.Do(c)
   2: if err != nil {
   3:     return err
   4: }
   6: if result == nil {
   7:     return etcdErr.NewError(300, "Empty result from raft", s.Store().Index())
   8: }
  10: // response for raft related commands[join/remove]
  11: if b, ok := result.([]byte); ok {
  12:     w.WriteHeader(http.StatusOK)
  13:     w.Write(b)
  14:     return nil
  15: }
  17: var b []byte
  18: if strings.HasPrefix(req.URL.Path, "/v1") {
  19:     b, _ = json.Marshal(result.(*store.Event).Response(0))
  20:     w.WriteHeader(http.StatusOK)
  21: } else {
  22:     e, _ := result.(*store.Event)
  23:     b, _ = json.Marshal(e)
  25:     w.Header().Set("Content-Type", "application/json")
  26:     // etcd index should be the same as the event index
  27:     // which is also the last modified index of the node
  28:     w.Header().Add("X-Etcd-Index", fmt.Sprint(e.Index()))
  29:     w.Header().Add("X-Raft-Index", fmt.Sprint(s.CommitIndex()))
  30:     w.Header().Add("X-Raft-Term", fmt.Sprint(s.Term()))
  32:     if e.IsCreated() {
  33:         w.WriteHeader(http.StatusCreated)
  34:     } else {
  35:         w.WriteHeader(http.StatusOK)
  36:     }
  37: }
  39: w.Write(b)
  41: return nil

The ps variable is something called a PeerServer, and I haven’t check it yet. But all of this code is basically doing is: “send this to raft to Do something about it, then reply to the caller”. So let us look at what we are actually doing there. The Do method merely call the send() method, which looks like this:

   1: // Sends an event to the event loop to be processed. The function will wait
   2: // until the event is actually processed before returning.
   3: func (s *server) send(value interface{}) (interface{}, error) {
   4:     event := &ev{target: value, c: make(chan error, 1)}
   5:     s.c <- event
   6:     err := <-event.c
   7:     return event.returnValue, err
   8: }

Personally, I think that this is very interesting, and again, very much like the way you would structure an Erlang system. Of particular interest is the idea of event loop. That would be the s.c channel, and I assume that this is meant for a separate goroutine that is processing work on top of that. We ended up with transaction merging in Voron using pretty much the same system.

The s.c channel is a channel of ev pointers. And ev is defined as:

   2: // An internal event to be processed by the server's event loop.
   3: type ev struct {
   4:     target      interface{}
   5:     returnValue interface{}
   6:     c           chan error
   7: }

The interface{} definition it Go’s System.Object, basically. And the error channel is there to mark when we are done processing the event, I assume. I would structure it so we can send a null error as completion, and I bet that this is how this is done.

I’m currently assuming that this is being read from the loop() method. And while I usually don’t just comment on comments, this one is really nice:

   2: //--------------------------------------
   3: // Event Loop
   4: //--------------------------------------
   6: //               ________
   7: //            --|Snapshot|                 timeout
   8: //            |  --------                  ______
   9: // recover    |       ^                   |      |
  10: // snapshot / |       |snapshot           |      |
  11: // higher     |       |                   v      |     recv majority votes
  12: // term       |    --------    timeout    -----------                        -----------
  13: //            |-> |Follower| ----------> | Candidate |--------------------> |  Leader   |
  14: //                 --------               -----------                        -----------
  15: //                    ^          higher term/ |                         higher term |
  16: //                    |            new leader |                                     |
  17: //                    |_______________________|____________________________________ |

This comment promises an interesting function to read…

   2: func (s *server) loop() {
   3:     defer s.debugln("server.loop.end")
   5:     for {
   6:         state := s.State()
   8:         s.debugln("server.loop.run ", state)
   9:         switch state {
  10:         case Follower:
  11:             s.followerLoop()
  13:         case Candidate:
  14:             s.candidateLoop()
  16:         case Leader:
  17:             s.leaderLoop()
  19:         case Snapshotting:
  20:             s.snapshotLoop()
  22:         case Stopped:
  23:             s.stopped <- true
  24:             return
  25:         }
  26:     }
  27: }

Let us start by looking at the leaderLoop behavior:

   1: func (s *server) leaderLoop() {
   2:     s.setState(Leader)
   3:     logIndex, _ := s.log.lastInfo()
   5:     // Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat.
   6:     s.debugln("leaderLoop.set.PrevIndex to ", logIndex)
   7:     for _, peer := range s.peers {
   8:         peer.setPrevLogIndex(logIndex)
   9:         peer.startHeartbeat()
  10:     }
  12:     // Commit a NOP after the server becomes leader. From the Raft paper:
  13:     // "Upon election: send initial empty AppendEntries RPCs (heartbeat) to
  14:     // each server; repeat during idle periods to prevent election timeouts
  15:     // (§5.2)". The heartbeats started above do the "idle" period work.
  16:     go s.Do(NOPCommand{})
  18:     // Begin to collect response from followers
  19:     for s.State() == Leader {
  20:         var err error
  21:         select {
  22:         case e := <-s.c:
  23:             if e.target == &stopValue {
  24:                 // Stop all peers before stop
  25:                 for _, peer := range s.peers {
  26:                     peer.stopHeartbeat(false)
  27:                 }
  28:                 s.setState(Stopped)
  29:             } else {
  30:                 switch req := e.target.(type) {
  31:                 case Command:
  32:                     s.processCommand(req, e)
  33:                     continue
  34:                 case *AppendEntriesRequest:
  35:                     e.returnValue, _ = s.processAppendEntriesRequest(req)
  36:                 case *AppendEntriesResponse:
  37:                     s.processAppendEntriesResponse(req)
  38:                 case *RequestVoteRequest:
  39:                     e.returnValue, _ = s.processRequestVoteRequest(req)
  40:                 }
  41:             }
  43:             // Callback to event.
  44:             e.c <- err
  45:         }
  46:     }
  48:     s.syncedPeer = nil
  49: }

To be perfectly frank, this is really code code. I am loving the structure here. It is really fun to go through and figure out. And the code follows really closely the Raft paper. And… it appears that at some point I actually moved off the etcd code and into the go-raft codebase. I think that I’ll skip doing the Raft stuff for this blog post. It is long enough already, and just focus on the etcd stuff for now.

The part that we really care for this blog post about is the processCommand call:

   1: // Processes a command.
   2: func (s *server) processCommand(command Command, e *ev) {
   3:     s.debugln("server.command.process")
   5:     // Create an entry for the command in the log.
   6:     entry, err := s.log.createEntry(s.currentTerm, command, e)
   8:     if err != nil {
   9:         s.debugln("server.command.log.entry.error:", err)
  10:         e.c <- err
  11:         return
  12:     }
  14:     if err := s.log.appendEntry(entry); err != nil {
  15:         s.debugln("server.command.log.error:", err)
  16:         e.c <- err
  17:         return
  18:     }
  20:     s.syncedPeer[s.Name()] = true
  21:     if len(s.peers) == 0 {
  22:         commitIndex := s.log.currentIndex()
  23:         s.log.setCommitIndex(commitIndex)
  24:         s.debugln("commit index ", commitIndex)
  25:     }
  26: }

In createEntry, we create effectively serialize the command into JSON, and appenEntry writes it to a file. (I finally found the serialize format, it is JSON for the commands, wrapped in a protobuf envelop). As an aside, if this was a C# code, I would be very worried about the cost of all those allocations. The data is first moved to a JSON buffer, then into a protocol buffer entry, where it is marshaled into another buffer, and only then is it written to the actual file. That is pretty prevalent in the codebase, to be honest. But again, this is Raft stuff that is going on, not etcd stuff. So we’ll ignore this for now and try to see where we actually get to apply the command against our own internal state.

I had to go through some hoops to figure it out. In particular, commands are applied during recovery, or when we are actually committing the state following a quorum, and this is happening in the Log.ApplyFunc, which is setup externally, and… Anyway, what we actually do is this:

   1: // Apply command to the state machine.                                 
   2: switch c := c.(type) {                                                 
   3: case CommandApply:                                                     
   4:     return c.Apply(&context{                                              
   5:         server:       s,                                                     
   6:         currentTerm:  s.currentTerm,                                         
   7:         currentIndex: s.log.internalCurrentIndex(),                          
   8:         commitIndex:  s.log.commitIndex,                                     
   9:     })                                                                    
  10: case deprecatedCommandApply:                                           
  11:     return c.Apply(s)                                                     
  12: default:                                                               
  13:     return nil, fmt.Errorf("Command does not implement Apply()")          
  14: }                                                                      

And that goes all the way back to the CreateCommand’s Apply function, which does:

   1: // Create node                                                                                      
   2: func (c *CreateCommand) Apply(context raft.Context) (interface{}, error) {                          
   3:     s, _ := context.Server().StateMachine().(store.Store)                                              
   5:     e, err := s.Create(c.Key, c.Dir, c.Value, c.Unique, c.ExpireTime)                                  
   7:     if err != nil {                                                                                    
   8:         log.Debug(err)                                                                                    
   9:         return nil, err                                                                                   
  10:     }                                                                                                  
  12:     return e, nil                                                                                      
  13: }                                                                                                   

So, basically, we have Raft that does the hard work of getting a Quorum, persistence, etc.  The etcd server is responsible for the in memory state, defining commands, etc.

The really interesting part from my perspective is that we need to process erroneous entries as well, in the same manner. For example, let us say that I want to create a new entry, but only if it isn’t already there. The way it works, even though I know that this would be an error, I have to run this through Raft, get a consensus that we can apply this command, and then we apply the command, see that it is wrong, and return an error. That error leaves no state changes, but it still had to go through the Raft process, it is going to be in the log forever, etc. I’m guessing that the percentage of erroneous commands is low, to be able to tolerate that.

And, at any rate. That pretty much conclude my review of etcd. It comes to about 20 pages or so, according to my math, and that is quite enough. On the other hand, it might have been 7 posts, instead. I would really like to get some feedback on which option you like more, dear reader.

Next, I’m going to go over go-raft, I have some thoughts about this, but I’ll keep them for my next post.

As a side note. I am not, by any means, an experience Go developer. I haven’t even read Go code beyond Hello World before starting reading the etcd codebase. But I can tell you that this is a very nice codebase to look at. It is clear, nicely laid out, it is possible to go through everything and understand what is going on easily.


03/07/2014 10:38 AM by

I liked it in one go - just like this

03/07/2014 10:44 AM by

Great post, as usual. Btw, line 11 reminds me more sth like 'yield return'.

Jason Meckley
03/07/2014 02:58 PM by
Jason Meckley

I prefer the multi-post approach where you start with the summarizing ideas and then dig into the details. I find this easier to digest.

One option is to write a series of posts and release them simultaneously. The "House of Cards" release cycle :)

03/07/2014 03:03 PM by

Great post as usual specially like the analogy with C# TPL and Async but wanted to clarify that defer is not like using statement, but it is more like try- finally , defer func will execute func just before return of function executing the defer. Hope this helps.

Jake Scott
03/07/2014 07:53 PM by
Jake Scott

There is a good visualisation of Raft here http://thesecretlivesofdata.com/raft/

Comments have been closed on this topic.