My Oredev talk is now available here:
The Raft protocol gives us a stable replicated distributed log. In other words, all servers in the cluster will agree on all the committed entries to the log (both what they are, and in what position). We usually fill the logs in operations that a state machine will execute.
In the Tail/Feather example, the commands are set/del operations on the key value store. Note that this doesn’t mean that all servers will always have the same state. It is possible that a server (or set of servers) will have an outdated view of the log, but the log that they have will match up to the point that they have it.
So, what is the problem? What happens when we have an active system? Well, every time that we make a modification, we’ll add it to the log. That is all good and great, but what about the actual log? Well, it is going to stay there, we need it so we can catch up any new server that will join the cluster. But that means that over time, we are going to have an unbounded growth. Which isn’t a very nice thing to have.
Rachis handle this by asking the state machine to implement snapshots. A way to take the current state of the state machine and transmit it over the network. For example, assume that we have an entry full of these logs:
{ Op: "Add", Key: "users/1/login-attempts", "Value": 1} { Op: "Add", Key: "users/1/login-attempts", "Value": 2} { Op: "Add", Key: "users/1/login-attempts", "Value": 3} // ... { Op: "Add", Key: "users/1/login-attempts", "Value": 300000}
The log for that is 300,000 entries long, but the current state of the machine:
{ "users/1/login-attempts": 300000 }
Which is obviously much smaller. Rachis doesn’t force a state machine to implement this, but if it isn’t doing so, we can never clear the log. But implementing snapshots has its own problems.
What about the actual cost of creating the snapshot? Imagine that we ask the state machine for a snapshot every 10,000 entries. In the example above, that would mean just writing out { "users/1/login-attempts": 300000 } or whatever the actual current value is.{ Op: "Add", Key: "users/1/login-attempts", "Value": 1} { Op: "Add", Key: "users/2/login-attempts", "Value": 1} { Op: "Add", Key: "users/3/login-attempts", "Value": 1} // ... { Op: "Add", Key: "users/300000/login-attempts", "Value": 1}
Note that instead of having 300,000 changes to the same key, we are going to have 300,000 keys. In this case, writing the full list down on every snapshot is very expensive. That is what incremental backups are here to solve. We let Voron know that this is what we want by specifying:
options.IncrementalBackupEnabled = true;
And now it is time to define policies about taking snapshots. We are going to handle this using Voron full & incremental snapshots. You can see the logic in the following code.
public void CreateSnapshot(long index, long term, ManualResetEventSlim allowFurtherModifications) { // we have not snapshot files, so this is the first time that we create a snapshot // we handle that by asking voron to create a full backup var files = Directory.GetFiles(_storageEnvironment.Options.BasePath, "*.Snapshot"); Array.Sort(files, StringComparer.OrdinalIgnoreCase); // make sure we get it in sort order if (files.Any() == false) { DoFullBackup(index, term, allowFurtherModifications); return; } string lastFullBackup = null; int fullBackupIndex = -1; for (int i = files.Length - 1; i >= 0; i--) { if (!files[i].StartsWith("Full")) continue; fullBackupIndex = i; lastFullBackup = files[i]; break; } if (lastFullBackup == null) { // this shouldn't be the case, we must always have at least one full backup. // maybe user deleted it? We'll do a full backup here to compensate DoFullBackup(index, term, allowFurtherModifications); return; } var fullBackupSize = new FileInfo(lastFullBackup).Length; var incrementalBackupsSize = files.Skip(fullBackupIndex + 1).Sum(f => new FileInfo(f).Length); // now we need to decide whatever to do a full or incremental backup, doing incremental backups stop // making sense if they will take more space than the full backup. Our cutoff point is when it passes to 50% // size of the full backup. // If full backup size is 1 GB, and we have 25 incrmeental backups that are 600 MB in size, we need to transfer // 1.6 GB to restore. If we generate a new full backup, we'll only need to transfer 1 GB to restore. if (incrementalBackupsSize / 2 > fullBackupSize) { DoFullBackup(index, term, allowFurtherModifications); return; } DeleteOldSnapshots(files.Take(fullBackupIndex - 1));// delete snapshots older than the current full backup var incrementalBackup = new IncrementalBackup(); incrementalBackup.ToFile(_storageEnvironment, Path.Combine(_storageEnvironment.Options.BasePath, string.Format("Inc-{0:D19}-{1:D19}.Snapshot", index, term)), infoNotify: Console.WriteLine, backupStarted: allowFurtherModifications.Set); } private void DoFullBackup(long index, long term, ManualResetEventSlim allowFurtherModifications) { var snapshotsToDelete = Directory.GetFiles(_storageEnvironment.Options.BasePath, "*.Snapshot"); var fullBackup = new FullBackup(); fullBackup.ToFile(_storageEnvironment, Path.Combine(_storageEnvironment.Options.BasePath, string.Format("Full-{0:D19}-{1:D19}.Snapshot", index, term)), infoNotify: Console.WriteLine, backupStarted: allowFurtherModifications.Set ); DeleteOldSnapshots(snapshotsToDelete); } private static void DeleteOldSnapshots(IEnumerable<string> snapshotsToDelete) { foreach (var snapshot in snapshotsToDelete) { try { File.Delete(snapshot); } catch (Exception) { // we ignore snapshots we can't delete, they are expected if we are concurrently writing // the snapshot and creating a new one. We'll get them the next time. } } }
Basically, we need to strike a balance between full and incremental backups. We do that by first taking a full backup, and then starting to take incremental backups until our incremental backups takes more than 50% of the full backup, at which point we are probably better off doing another full backup. Note that we use the event of a full backup to clear the old incremental and full backup files.
And with that, we can move to actually sending the snapshot over the wire. This is exposed by the GetSnapshotWriter() method. This just shell all the responsibility to the SnapshotWriter:
public ISnapshotWriter GetSnapshotWriter() { return new SnapshotWriter(this); } public class SnapshotWriter : ISnapshotWriter { private readonly KeyValueStateMachine _parent; private List<FileStream> _files = new List<FileStream>(); public SnapshotWriter(KeyValueStateMachine parent) { _parent = parent; var files = Directory.GetFiles(_parent._storageEnvironment.Options.BasePath, "*.Snapshot"); var fullBackupIndex = GetFullBackupIndex(files); if (fullBackupIndex == -1) throw new InvalidOperationException("Could not find a full backup file to start the snapshot writing"); var last = Path.GetFileNameWithoutExtension(files[files.Length-1]); Debug.Assert(last != null); var parts = last.Split('-'); if(parts.Length != 3) throw new InvalidOperationException("Invalid snapshot file name " + files[files.Length - 1] + ", could not figure out index & term"); Index = long.Parse(parts[1]); Term = long.Parse(parts[2]); for (int i = fullBackupIndex; i < files.Length; i++) { _files.Add(File.OpenRead(files[i])); } } public void Dispose() { foreach (var file in _files) { file.Dispose(); } } public long Index { get; private set; } public long Term { get; private set; } public void WriteSnapshot(Stream stream) { var writer = new BinaryWriter(stream); writer.Write(_files.Count); foreach (var file in _files) { writer.Write(file.Name);
writer.Write(file.Length); writer.Flush(); file.CopyTo(stream); } } }
What is going on here? We get the snapshot files, and find the latest full backup, then we open all the files that we’ll need for the snapshot (the last full backup and everything afterward). We need to open them in the constructor to lock them for deletion by the CreateSnapshot() method.
Then we just concatenate them all and send them over the wire. And getting them? That is pretty easy as well:
public void ApplySnapshot(long term, long index, Stream stream) { var basePath = _storageEnvironment.Options.BasePath; _storageEnvironment.Dispose(); foreach (var file in Directory.EnumerateFiles(basePath)) { File.Delete(file); } var files = new List<string>(); var buffer = new byte[1024*16]; var reader = new BinaryReader(stream); var filesCount = reader.ReadInt32(); if (filesCount == 0) throw new InvalidOperationException("Snapshot cannot contain zero files"); for (int i = 0; i < filesCount; i++) { var name = reader.ReadString(); files.Add(name); var len = reader.ReadInt64(); using (var file = File.Create(Path.Combine(basePath, name))) { file.SetLength(len); var totalFileRead = 0; while (totalFileRead < len) { var read = stream.Read(buffer, 0, (int) Math.Min(buffer.Length, len - totalFileRead)); if (read == 0) throw new EndOfStreamException(); totalFileRead += read; file.Write(buffer, 0, read); } } } new FullBackup().Restore(Path.Combine(basePath, files[0]), basePath); var options = StorageEnvironmentOptions.ForPath(basePath); options.IncrementalBackupEnabled = true; //TODO: Copy any other customizations that might have happened on the options new IncrementalBackup().Restore(options, files.Skip(1)); _storageEnvironment = new StorageEnvironment(options); using (var tx = _storageEnvironment.NewTransaction(TransactionFlags.ReadWrite)) { var metadata = tx.ReadTree("$metadata"); metadata.Add("last-index", EndianBitConverter.Little.GetBytes(index)); LastAppliedIndex = index; tx.Commit(); } }
Unpack the snapshots from the stream, then first apply a full backup, then all the incremental backups. Make sure to update the last applied index, and we are set .
As I mentioned Tail/Feather is a weekend project to test out how stuff works for real. After creating the highly available distributed key/value store, we are now in need of actually building a client API for it.
Externally, that API is going to look like this:
public class TailFeatherClient : IDisposable
{
public TailFeatherClient(params Uri[] nodes);
public Task Set(string key, JToken value);
public Task<JToken> Get(string key);
public Task Remove(string key);
public void Dispose();
}
If this wasn’t a weekend project, I would add batch support, but that isn’t important for our purposes right now. The API itself is pretty stupid, which is great, but what about the actual behavior?
We want it to be able to handle dynamic cluster changes, and we need it to be smart about it. A lot of that is shared among all operations, so the next layer of the API is:
public Task Set(string key, JToken value)
{
return ContactServer(client => client.GetAsync(string.Format("tailfeather/key-val/set?key={0}&val={1}",
Uri.EscapeDataString(key), Uri.EscapeDataString(value.ToString(Formatting.None)))));
}
public async Task<JToken> Get(string key)
{
var reply = await ContactServer(client => client.GetAsync(string.Format("tailfeather/key-val/del?key={0}",
Uri.EscapeDataString(key))));
var result = JObject.Load(new JsonTextReader(new StreamReader(await reply.Content.ReadAsStreamAsync())));
if (result.Value<bool>("Missing"))
return null;
return result["Value"];
}
public Task Remove(string key)
{
return ContactServer(client => client.GetAsync(string.Format("tailfeather/key-val/del?key={0}",
Uri.EscapeDataString(key))));
}
The actual behavior is in ContactServer:
private readonly ConcurrentDictionary<Uri, HttpClient> _cache = new ConcurrentDictionary<Uri, HttpClient>();
private Task<TailFeatherTopology> _topologyTask;
public TailFeatherClient(params Uri[] nodes)
{
_topologyTask = FindLatestTopology(nodes);
}
private HttpClient GetHttpClient(Uri node)
{
return _cache.GetOrAdd(node, uri => new HttpClient { BaseAddress = uri });
}
private async Task<TailFeatherTopology> FindLatestTopology(IEnumerable<Uri> nodes)
{
var tasks = nodes.Select(node => GetHttpClient(node).GetAsync("tailfeather/admin/flock")).ToArray();
await Task.WhenAny(tasks);
var topologies = new List<TailFeatherTopology>();
foreach (var task in tasks)
{
var message = task.Result;
if (message.IsSuccessStatusCode == false)
continue;
topologies.Add(new JsonSerializer().Deserialize<TailFeatherTopology>(
new JsonTextReader(new StreamReader(await message.Content.ReadAsStreamAsync()))));
}
return topologies.OrderByDescending(x => x.CommitIndex).FirstOrDefault();
}
private async Task<HttpResponseMessage> ContactServer(Func<HttpClient, Task<HttpResponseMessage>> operation, int retries = 3)
{
if (retries < 0)
throw new InvalidOperationException("Cluster is not reachable, or no leader was selected. Out of retries, aborting.");
var topology = (await _topologyTask ?? new TailFeatherTopology());
var leader = topology.AllVotingNodes.FirstOrDefault(x => x.Name == topology.CurrentLeader);
if (leader == null)
{
_topologyTask = FindLatestTopology(topology.AllVotingNodes.Select(x => x.Uri));
return await ContactServer(operation, retries - 1);
}
// now we have a leader, we need to try calling it...
var httpResponseMessage = await operation(GetHttpClient(leader.Uri));
if (httpResponseMessage.IsSuccessStatusCode == false)
{
// we were sent to a different server, let try that...
if (httpResponseMessage.StatusCode == HttpStatusCode.Redirect)
{
var redirectUri = httpResponseMessage.Headers.Location;
httpResponseMessage = await operation(GetHttpClient(redirectUri));
if (httpResponseMessage.IsSuccessStatusCode)
{
// we successfully contacted the redirected server, this is probably the leader, let us ask it for the topology,
// it will be there for next time we access it
_topologyTask = FindLatestTopology(new[] { redirectUri }.Union(topology.AllVotingNodes.Select(x => x.Uri)));
return httpResponseMessage;
}
}
// we couldn't get to the server, and we didn't get redirected, we'll check in the cluster in general
_topologyTask = FindLatestTopology(topology.AllVotingNodes.Select(x => x.Uri));
return await ContactServer(operation, retries - 1);
}
// happy path, we are done
return httpResponseMessage;
}
There is quite a bit going on here. But the basic idea is simple. Starting from the initial list of nodes we have, contact all of them and find the topology with the highest commit index. That means that it is the freshest, so more likely to be the current one. From the topology, we take the leader, and send all queries to the leader.
If there is any sort of errors, we’ll contact all other servers to find who we are supposed to be using now. If we can’t find it after three tries, we give us and we let the caller sort it out, probably by retrying once the cluster is in a steady state again.
Now, this is really nice, but it is falling into the heading of weekend code. That is means that this is quite far from what I would call production code. What is missing?
- Caching the topology locally in a persistent manner so we can restart when the known servers are down from last known good topology.
- Proper error handling, and in particular, error reporting, to make sure that we can understand what is actually is going on.
- Features such as allowing reads from non leaders, testing, etc.
But overall, I’m quite happy with this.
Weekend project means just that, I’m trying some things out, and writing something real is the best way to exercise. This isn’t going to be a full blown project, but it should be functional and usable.
The basic idea, I’m going to build a distributed key/value configuration store. Similar to etcd, this will allow me to explore how to handle full blown Rachis from both server & client sides.
We want this to be a full blown implementation, which means persistence, snapshots, network api, the works.
In terms of the data model, we’ll go for the simplest possible one. A key/value store. A key is a string of up to 128 characters. A value is a json formatted value of up to 16Kb. Persistence will be handled by Voron. The persistent of the project is mostly Voron, so what we are left with is the following:
public enum KeyValueOperationTypes
{
Add,
Del
}
public class KeyValueOperation
{
public KeyValueOperationTypes Type;
public string Key;
public JToken Value;
}
public class OperationBatchCommand : Command
{
public KeyValueOperation[] Batch { get; set; }
}
This gives us the background for the actual state machine:
public class KeyValueStateMachine : IRaftStateMachine
{
readonly StorageEnvironment _storageEnvironment;
public KeyValueStateMachine(StorageEnvironmentOptions options)
{
_storageEnvironment = new StorageEnvironment(options);
using (var tx = _storageEnvironment.NewTransaction(TransactionFlags.ReadWrite))
{
_storageEnvironment.CreateTree(tx, "items");
var metadata = _storageEnvironment.CreateTree(tx, "$metadata");
var readResult = metadata.Read("last-index");
if (readResult != null)
LastAppliedIndex = readResult.Reader.ReadLittleEndianInt64();
tx.Commit();
}
}
public event EventHandler<KeyValueOperation> OperatonExecuted;
protected void OnOperatonExecuted(KeyValueOperation e)
{
var handler = OperatonExecuted;
if (handler != null) handler(this, e);
}
public JToken Read(string key)
{
using (var tx = _storageEnvironment.NewTransaction(TransactionFlags.Read))
{
var items = tx.ReadTree("items");
var readResult = items.Read(key);
if (readResult == null)
return null;
return JToken.ReadFrom(new JsonTextReader(new StreamReader(readResult.Reader.AsStream())));
}
}
public long LastAppliedIndex { get; private set; }
public void Apply(LogEntry entry, Command cmd)
{
var batch = (OperationBatchCommand)cmd;
Apply(batch.Batch, cmd.AssignedIndex);
}
private void Apply(IEnumerable<KeyValueOperation> ops, long commandIndex)
{
using (var tx = _storageEnvironment.NewTransaction(TransactionFlags.ReadWrite))
{
var items = tx.ReadTree("items");
var metadata = tx.ReadTree("$metadata");
metadata.Add("last-index", EndianBitConverter.Little.GetBytes(commandIndex));
var ms = new MemoryStream();
foreach (var op in ops)
{
switch (op.Type)
{
case KeyValueOperationTypes.Add:
ms.SetLength(0);
var streamWriter = new StreamWriter(ms);
op.Value.WriteTo(new JsonTextWriter(streamWriter));
streamWriter.Flush();
ms.Position = 0;
items.Add(op.Key, ms);
break;
case KeyValueOperationTypes.Del:
items.Delete(op.Key);
break;
default:
throw new ArgumentOutOfRangeException();
}
OnOperatonExecuted(op);
}
tx.Commit();
}
}
public void Dispose()
{
if (_storageEnvironment != null)
_storageEnvironment.Dispose();
}
}
As you can see, there isn’t much here. Not surprising, since we are storing a key/value data structure. I’m also ignoring snapshots for now. That is good enough for now, let us go for the network portion of the work. We are going to be using Web API for the network stuff. And we’ll be initializing it like so:
var nodeName = options.NodeName ?? (Environment.MachineName + ":" + options.Port);
var kvso = StorageEnvironmentOptions.ForPath(Path.Combine(options.DataPath, "KeyValue"));
using (var statemachine = new KeyValueStateMachine(kvso))
{
using (var raftEngine = new RaftEngine(new RaftEngineOptions(
new NodeConnectionInfo
{
Name = nodeName,
Url = new Uri("http://" + Environment.MachineName + ":" + options.Port),
},
StorageEnvironmentOptions.ForPath(Path.Combine(options.DataPath, "Raft")),
new HttpTransport(nodeName),
statemachine
)))
{
using (WebApp.Start(new StartOptions
{
Urls = { "http://+:" + options.Port + "/" }
}, builder =>
{
var httpConfiguration = new HttpConfiguration();
RaftWebApiConfig.Register(httpConfiguration);
httpConfiguration.Properties[typeof(HttpTransportBus)] = new HttpTransportBus(nodeName);
httpConfiguration.Properties[typeof(RaftEngine)] = raftEngine;
builder.UseWebApi(httpConfiguration);
}))
{
Console.WriteLine("Ready & processing requests, press ENTER to sop");
Console.ReadLine();
}
}
}
Note that we need to initialize both the state machine and the raft engine, then wire the raft engine controllers. Now we are pretty much done with setup, and we can turn to the actual semantics of running the cluster. The first thing that I want to do is to setup the baseline, so we create this base controller:
public abstract class TailFeatherController : ApiController
{
public KeyValueStateMachine StateMachine { get; private set; }
public RaftEngine RaftEngine { get; private set; }
public override async Task<HttpResponseMessage> ExecuteAsync(HttpControllerContext controllerContext, CancellationToken cancellationToken)
{
RaftEngine = (RaftEngine)controllerContext.Configuration.Properties[typeof(RaftEngine)];
StateMachine = (KeyValueStateMachine)RaftEngine.StateMachine;
try
{
return await base.ExecuteAsync(controllerContext, cancellationToken);
}
catch (NotLeadingException)
{
var currentLeader = RaftEngine.CurrentLeader;
if (currentLeader == null)
{
return new HttpResponseMessage(HttpStatusCode.PreconditionFailed)
{
Content = new StringContent("{ 'Error': 'No current leader, try again later' }")
};
}
var leaderNode = RaftEngine.CurrentTopology.GetNodeByName(currentLeader);
if (leaderNode == null)
{
return new HttpResponseMessage(HttpStatusCode.InternalServerError)
{
Content = new StringContent("{ 'Error': 'Current leader " + currentLeader + " is not found in the topology. This should not happen.' }")
};
}
return new HttpResponseMessage(HttpStatusCode.Redirect)
{
Headers =
{
Location = leaderNode.Uri
}
};
}
}
}
That is a lot of error handling, but basically it just get the right values from the configuration and expose them to the controller actions, then a lot of error handling when we have a command that requires a leader that hit a follower.
Next step, actually managing the cluster, here we go:
public class AdminController : TailFeatherController
{
[HttpGet]
[Route("tailfeather/admin/fly-with-us")]
public async Task<HttpResponseMessage> Join([FromUri] string url, [FromUri] string name)
{
var uri = new Uri(url);
name = name ?? uri.Host + (uri.IsDefaultPort ? "" : ":" + uri.Port);
await RaftEngine.AddToClusterAsync(new NodeConnectionInfo
{
Name = name,
Uri = uri
});
return new HttpResponseMessage(HttpStatusCode.Accepted);
}
[HttpGet]
[Route("tailfeather/admin/fly-away")]
public async Task<HttpResponseMessage> Leave([FromUri] string name)
{
await RaftEngine.RemoveFromClusterAsync(new NodeConnectionInfo
{
Name = name
});
return new HttpResponseMessage(HttpStatusCode.Accepted);
}
}
So now we have a way to add and remove items from the cluster, which is all the admin stuff that we need to handle right now. Next, we need to actually wire the operations, this is done here:
public class KeyValueController : TailFeatherController
{
[HttpGet]
[Route("tailfeather/key-val/read")]
public HttpResponseMessage Read([FromUri] string key)
{
var read = StateMachine.Read(key);
if (read == null)
{
return Request.CreateResponse(HttpStatusCode.NotFound, new
{
RaftEngine.State,
Key = key,
Missing = true
});
}
return Request.CreateResponse(HttpStatusCode.OK, new
{
RaftEngine.State,
Key = key,
Value = read
});
}
[HttpGet]
[Route("tailfeather/key-val/set")]
public Task<HttpResponseMessage> Set([FromUri] string key, [FromUri] string val)
{
JToken jVal;
try
{
jVal = JToken.Parse(val);
}
catch (JsonReaderException)
{
jVal = val;
}
var op = new KeyValueOperation
{
Key = key,
Type = KeyValueOperationTypes.Add,
Value = jVal
};
return Batch(new[] { op });
}
[HttpGet]
[Route("tailfeather/key-val/del")]
public Task<HttpResponseMessage> Del([FromUri] string key)
{
var op = new KeyValueOperation
{
Key = key,
Type = KeyValueOperationTypes.Del,
};
return Batch(new[] { op });
}
[HttpPost]
[Route("tailfeather/key-val/batch")]
public async Task<HttpResponseMessage> Batch()
{
var stream = await Request.Content.ReadAsStreamAsync();
var operations = new JsonSerializer().Deserialize<KeyValueOperation[]>(new JsonTextReader(new StreamReader(stream)));
return await Batch(operations);
}
private async Task<HttpResponseMessage> Batch(KeyValueOperation[] operations)
{
var taskCompletionSource = new TaskCompletionSource<object>();
RaftEngine.AppendCommand(new OperationBatchCommand
{
Batch = operations,
Completion = taskCompletionSource
});
await taskCompletionSource.Task;
return Request.CreateResponse(HttpStatusCode.Accepted);
}
}
And we are pretty much set.
Note that I’ve been writing this post while I’m writing the code, so I’ve made some small changes, you can see actual code here.
Anyway, we are pretty much done. Now we can compile and try testing what is going on.
First, we seed the cluster, but running:
.\TailFeather.exe --port=9079 --DataPath=One --Name=One –Bootstrap
This tell us that this node is allowed to become a leader without having to pre-configure a cluster. This command runs and exit, so now we’ll run three such copies:
- start .\TailFeather.exe "--port=9079 --DataPath=One --Name=One"
- start .\TailFeather.exe "--port=9078 --DataPath=Two --Name=Two"
- start .\TailFeather.exe "--port=9077 --DataPath=Three --Name=Three"
We have all three nodes up and running, so now is the time to actually make use of it:
http://localhost:9079/tailfeather/key-val/set?key=ravendb&val={ 'Url': 'http://live-test.ravendb.net', 'Database': 'Sample' }
In this case, you can see that we are setting a configuration value to point to a RavenDB database on the first node. Note that at this point, we have a single node cluster, and the two other are waiting to join it, but are taking no action.
We can get the value back using:
So far, so good. Now, let us add a second node in by inviting it to fly with our cluster. We do that using the following command:
http://localhost:9079/tailfeather/admin/fly-with-us?url=http://localhost:9078&name=Two
Which will give us:
Note that we are using the just added node for too look at this.
Next, we can add the third node.
http://localhost:9079/tailfeather/admin/fly-with-us?url=http://localhost:9077&name=Three
I would put the image in, but I think you get the point.
This is it for now. We have a highly available persistent & distributed key/value store. Next, we need to tackle the idea of snapshots and the client API, but I’ll deal with that at another post.
Rafal an Ben Foster commented on my previous post with some ideas on how to deal with incremental updates to map/reduce indexes. Rafal said:
Actually, it's quite simple if you can 'reverse' the mapping operation (for given key find all documents matching that key): you just delete aggregate record with specified key and run incremental map-reduce on all matching documents. In today's example, you would delete the aggregate with key='oren' and then run map reduce with a query:
db.items.mapReduce(map,reduce, { out: {reduce: ‘distinct_item_names’}, query: {name: 'oren' } });
And Ben said:
It's worth mentioning that I was able to get the MongoDB map-reduce collections updating automatically (insert/update/delete) by monitoring the MongoDB OpLog …
…and listen for new documents in the OpLog which could then be used to re-execute an incremental Map-Reduce.
And while this looks right, this actually can’t possibly work. I’ll start from Rafal’s suggestion first. He suggest just issuing the following set of commands whenever we delete something from the database:
1: db.distinct_item_names.remove({name: 'oren' } });2: db.items.mapReduce(map,reduce, { out: {reduce: ‘distinct_item_names’}, query: {name: 'oren' } });
And yes, that will actually work, as long as you are careful to never do this concurrently. Because if you do run this concurrently… well, the best you can hope is no data, but the liker scenario is data corruption.
But this actually gets better, deletes are annoying, but they are a relatively simple case to process. You have updates to deal with too. We’ll assume that we are watching the oplog to get notified when this happens. Here is an MongoDB oplog entry
1: {
2: "ts": {3: "t": 1286821984000,4: "i": 15: },
6: "h": "1633487572904743924",7: "op": "u",8: "ns": "items",9: "o2": {10: "_id": "4cb35859007cc1f4f9f7f85d"11: },
12: "o": {13: "$set": {14: "Name": "Eini"15: }
16: }
17: }
As you can see, we an update operation (op: u) on a specific document (o2._id) with the specified update (o.$set). That is really great, and it is utterly useless for our purposes. In this case, we updated the name from Oren to Eini, so we would like to be able to run this:
1: db.distinct_item_names.remove({name: 'oren' } });2: db.distinct_item_names.remove({name: eini' } });3: db.items.mapReduce(map,reduce, { out: {reduce: ‘distinct_item_names’}, query: {name: 'oren' } });4: db.items.mapReduce(map,reduce, { out: {reduce: ‘distinct_item_names’}, query: {name: eini' } });
Except that we don’t have any way to get the old value out from the oplog. And this still isn’t going to work concurrently.
But let us say that we decided to have a watcher process monitor the oplog somehow, and it will ensure no concurrency of those requests. Now you have to deal with fun issues like: “what happens if the watcher process recycle?” How do you keep your place in the oplog (and remember, the oplog is capped, stuff you haven’t seen might be removed if they are beyond the specified size.
And… to be frank, once we have done all of that, this is still the easy part. One of the reasons that you want to do this work in the first place is to deal with large amount of data. But you cannot assume that you’ll have even distribution of the data.
One bug request that came against the RavenDB map/reduce implementation was a map/reduce index on the US Census data. That is ~300 million documents, and the index the user wanted to build was a map/reduce group by the state. You have states like California, with more than 30 million people in it, and you realize that you don’t want to have to re-do the map/reduce over the entire 30+ million documents that you have there. In RavenDB, under this scenario, you’ll have to issue about 3,073 operations, by the way. Versus the 30 millions you would need for this approach.
So yeah, “incremental” map/reduce can’t handle concurrent work, can’t handle deletes, can’t handle updates, and definitely shouldn’t be used on large data sets. And that is after you went to the trouble of setting up the watcher process, monitoring the oplog, etc.
Or, you can use RavenDB and you get a true incremental map/reduce without having to worry about any of that.
Ben Foster has a really cool article showing some of the similarities and differences between MongoDB & RavenDB with regards to their map/reduce implementation.
However, there is a very important distinction that was missed. Map/reduce operations are run online in MongoDB, that means that for large collections, map/reduce is going to be very expensive. MongoDB has the option of taking the result of a map/reduce operation and writing it to a collection, so you don’t need to run map/reduce jobs all the time. However, that is a snapshot view of the data, not a live view. Ben mentioned that you can do something called incremental map/reduce, but that isn’t actually really good idea at all.
Let us look at the following sequence of operations:
1: db.items.insert({name: 'oren', ts: 1 });2: db.items.insert({name: 'ayende', ts: 2});3:
4: var map = function Map() { emit(this.name,null); };5: var reduce = function(key, val) { return key; };6:
7: db.items.mapReduce(map,reduce, { out: 'distinct_item_names' });
This creates two items, and give me the distinct names in a separate collection. Now, let us see how that works with updates…
1: db.items.insert({name: 'eini', ts: 3 });2:
3: db.items.mapReduce(map,reduce, { out: {reduce: 'distinct_item_names'}, query: {ts: {$gt: 2} } });
This is actually nice, mongo is able to merge the previous results with the new results, so you only have to do the work on the new data. But this has several implications:
- You have to kick something like ‘ts’ property around to check for new stuff. And you have to _udpate_ that ts property on every update.
- You have to run this on a regular basis yourself, mongo won’t do that for you.
- It can’t work with deletes.
It is the last part that is really painful:
1: db.items.remove({name: 'oren'});
Now, there is just no way for you to construct a map/reduce job that would remove the name when it is gone.
This sort of thing works very nicely when what you want is to just append stuff. That is easy. It is PITA when we are talking about actually using it for live data, that can change and be modified.
Contrast that with the map/reduce implementation in RavenDB:
- No need to manually maintain state, the database does it for you.
- No need to worry about updates & deletes, the database does it for you.
- No need to schedule map/reduce job updates, database does it for you.
- Map/reduce queries are very fast, regardless of data size.
To be frank, the map/reduce implementation in RavenDB is complex, and pretty much all of it comes down to the fact that we don’t do stupid stuff like run a map/reduce operation on a large database on every query, and that we support edge cases scenarios like data that is actually updated or deleted.
Naturally I’m biased, but it seems to me that trying to use map/reduce in Mongo just means that you have to do a lot of hand holding yourself, while with RavenDB, we take care of everything and leaving you to actually do stuff.
I was asked to review the book (and received a free electronic copy).
As someone that is very into storage engines, I was quite excited about this. After going over the leveldb codebase, I would finally get to read a real book about how it works.
I was disappointed, badly.
This book isn’t really about leveldb. It contains pretty much no background, explanation, history or anything much at all about how leveldb works. Instead, it is pretty much a guide of how to use LevelDB to write iOS application. There is a lot of chapters dealing with Objective-C, NSString and variants, how to do binding, how to handle drag and drop.
However, things that I would expect. Such as explanations of how it works, what does it do, alternative use cases, etc are very rare, if there at all. Only chapter 10 is really worth reading, and even so, I got the feeling that it only made sense to me because I already knew quite a lot leveldb already. I can’t imagine actually starting from scratch and actually being able to understand leveldb from this book.
If you are working on iOS apps / OS X, I guess that this might be a good choice, but only if you want to know about actually implementing leveldb. You’ll need to do your actual leveldb learning elsewhere.
The book does contain some interesting tidbits. Chapter 10 is talking about tuning and key policies, and it did have some interesting things to talk about, but it also contain wrong information* (and if I could spot it, with my relatively little experience with leveldb, I’m pretty sure that there are other things there too that are wrong).
* The book said that is it better to write keys in order, to reduce I/O. But leveldb writes to a skip list in memory, then flush that entire thing in sorted fashion to disk. Your writes have to be bigger than the buffer size of that to actually matter, and that still won’t help you much.
In short, feel free to skip this book, unless you are very focused on writing leveldb apps on iOS. In which case it might be a worth it, but I don’t think so. You are better off reading the docs or any of the tutorials.
One of the worst things that can happen to you professionally is stagnation. You know what you are doing, you know how it works, and you can coast along very easily. Unfortunately, there is the old, it isn’t what we know that we don’t know that is going to hurt us. It is what we don’t know that we don’t know that is going to bite us in the end.
One of the reasons that I have routinely been going out and searching for difficult codebases to read has been to avoid that. I know that I don’t know a lot, I just don’t know what I don’t know. So I go into an unfamiliar codebase and try to figure out how things work over there.
I have been doing that for quite some time now. And I am not talking about looking at some sample project a poo schlump put out to show how you can do CQRS with 17 projects to create a ToDo app. I am talking about production code, and usually in areas or languages that I am not familiar with.
A short list of the stuff that I have been gone over:
- CouchDB (to learn Erlang, actually, but that got me to do DB stuff).
- LevelDB
- LMDB
- NServiceBus
- Mass Transit
- SignalR
- Hibernate
- Hibernate Search
Those are codebases that do interesting things that I wanted to learn from. Indeed, I have learned from each of those.
Some people can learn by reading academic papers, I find that I learn best from having a vague idea about what is going on, then diving into the implementation details and seeing how it all fits together.
But the entire post so far was a preface to the question I wanted to ask. If you are reading this post, I am pretty sure that you are a professional developer. Doctors, lawyers and engineers (to name a few) have to recertify every so often, to make sure that they are current. But I have seen all too many developers stagnate to the point where they are very effective in their chosen field (building web apps with jQuery Mobile on ASP.Net WebForms 3.5) and nearly useless otherwise.
So, how are you keeping your skills sharp and your knowledge current? What have you been learning lately? It can be a course, or a book or a side project or just reading code. But, in my opinion, it cannot be something passive. If you were going to answer: “I read your blog” as the answer to that question, that is not sufficient, flatterer. Although, you might want to go a bit further and consider that imitation is the sincerest form of flattery, so go ahead and do something.
Okay, having gone through the LMDB codebase with a fine toothed comb, I think that I can safely say that it is both a very impressive codebase and one the dearly need some TLC. I’ll freely admit that I am by no means a C guy. And it is entirely possible that a lot of the issues that I have been bugging me are standard C things. But I don’t think so. Methods that go on for hundreds of lines, duplicated code and plethora of gotos hardly seem to be the things that pop to mind when I hear good C code.
But beyond my issues with the code, the implementation is really quite brilliant. The way LMDB manages to pack so much functionality by not doing things is quite impressive. Interestingly, you couldn’t write this database even 5 years ago. LMDB relies on being able to map the db into memory, and up until x64 became prevalent, you just couldn’t do that for any db with a meaningful size. With x64 and the effectively unlimited address space we have (will I be laughing at my naivety in a few years?), that is no longer an issue.
I learned quite a lot from the project, and it has been frustrating, annoying and fascinating experience.
The second item to go over with the World’s smallest No SQL database is about persistence, and following that, I/O. Right now, there is no persistence. If you restart the process, all the data is gone. Now, there are actually quite a few real world dbs that behave in this fashion. But they are a rarity. For the most part, if I put data inside a db, I expect it to be there until I do something about it.
And at that point, you are in for quite a bit of complexity. How are you going to persist the data? The easiest way to do it, just create a file per every value in the db is going to be… problematic on most systems. So you need to put a lot of data in a small set of files. Which means that you have to decide how you are going to put the data together. In general, there is either the fixed size option, in which you divide the file(s) into pages and work around that. The good thing about this is that this gives you the ability to reuse space in the file after deletes / updates. The bad thing about that is that it is quite complex. Alternatively, you can just write the data out as needed, but then you can’t really update written data, and would need to run compactions.
And we haven’t talked about searching yet. Some DBs, like Bitcask / Munin, would actually store the keys in memory, and store the position on the disk for retrieving the value. But for the most part, both keys & values tend to be on disk in some form. In CouchDB, they are held inside an append only B+Tree. In LevelDB, they are held in Sorted String Tables. LMDB uses Copy-On-Write B+Tree. Esent use a B+Tree with a Log file.
In each of those cases, the actual semantics for persistent data involve at least two concerns. You need to actually be able to search the data (that usually mean more than just O(1) access, you want to be able to go back & forth on the keys) and you need to be able to do a transactional save. This is so you can recover in case of a crash, most of the time.
But there are actually a lot more that goes into the selection of the proper persistence format. To start with, how you store the data on disk will have a big effect on your performance. If you store the data as a linked list, for example, you might as well kiss your performance goodbye. Beyond that, we also have issues with things like how is the format going to scale when we have concurrent readers. For example, if you have something that does a lot of seeks, and rely on the seeks always going forward to ensure performance, that is going to be very badly hit the moment that you have concurrent readers doing concurrent reads on different parts of the system. You would be forcing the system to do random seeks.
There are other considerations, as well. For example, if you are using something like B+Tree, it is likely that you’ll be overwriting the same section on the disk multiple times. That is fine with HDD, but SSD would just give up & die on you at some point. And we haven’t started talking about secondary indexes yet…