Tail/Feather–Snapshots

time to read 15 min | 2839 words

The Raft protocol gives us a stable replicated distributed log. In other words, all servers in the cluster will agree on all the committed entries to the log (both what they are, and in what position). We usually fill the logs in operations that a state machine will execute.

In the Tail/Feather example, the commands are set/del operations on the key value store. Note that this doesn’t mean that all servers will always have the same state. It is possible that a server (or set of servers) will have an outdated view of the log, but the log that they have will match up to the point that they have it.

So, what is the problem? What happens when we have an active system? Well, every time that we make a modification, we’ll add it to the log. That is all good and great, but what about the actual log? Well, it is going to stay there, we need it so we can catch up any new server that will join the cluster. But that means that over time, we are going to have an unbounded growth. Which isn’t a very nice thing to have.

Rachis handle this by asking the state machine to implement snapshots. A way to take the current state of the state machine and transmit it over the network. For example, assume that we have an entry full of these logs:

{ Op: "Add", Key: "users/1/login-attempts", "Value": 1}
{ Op: "Add", Key: "users/1/login-attempts", "Value": 2}
{ Op: "Add", Key: "users/1/login-attempts", "Value": 3}

// ...

{ Op: "Add", Key: "users/1/login-attempts", "Value": 300000}

The log for that is 300,000 entries long, but the current state of the machine:

{ "users/1/login-attempts": 300000 }

Which is obviously much smaller. Rachis doesn’t force a state machine to implement this, but if it isn’t doing so, we can never clear the log. But implementing snapshots has its own problems.

What about the actual cost of creating the snapshot? Imagine that we ask the state machine for a snapshot every 10,000 entries. In the example above, that would mean just writing out { "users/1/login-attempts": 300000 } or whatever the actual current value is.

{ Op: "Add", Key: "users/1/login-attempts", "Value": 1}
{ Op: "Add", Key: "users/2/login-attempts", "Value": 1}
{ Op: "Add", Key: "users/3/login-attempts", "Value": 1}

// ...

{ Op: "Add", Key: "users/300000/login-attempts", "Value": 1}

Note that instead of having 300,000 changes to the same key, we are going to have 300,000 keys. In this case, writing the full list down on every snapshot is very expensive. That is what incremental backups are here to solve.  We let Voron know that this is what we want by specifying:

options.IncrementalBackupEnabled = true;

And now it is time to define policies about taking snapshots. We are going to handle this using Voron full & incremental snapshots. You can see the logic in the following code.

public void CreateSnapshot(long index, long term, ManualResetEventSlim allowFurtherModifications)
{
    // we have not snapshot files, so this is the first time that we create a snapshot
    // we handle that by asking voron to create a full backup
    var files = Directory.GetFiles(_storageEnvironment.Options.BasePath, "*.Snapshot");
    Array.Sort(files, StringComparer.OrdinalIgnoreCase); // make sure we get it in sort order
    if (files.Any() == false)
    {
        DoFullBackup(index, term, allowFurtherModifications);
        return;
    }
    string lastFullBackup = null;
    int fullBackupIndex = -1;
    for (int i = files.Length - 1; i >= 0; i--)
    {
        if (!files[i].StartsWith("Full")) 
            continue;
        fullBackupIndex = i;
        lastFullBackup = files[i];
        break;
    }
            
    if (lastFullBackup == null)
    {
        // this shouldn't be the case, we must always have at least one full backup. 
        // maybe user deleted it? We'll do a full backup here to compensate
        DoFullBackup(index, term, allowFurtherModifications);
        return;
    }
            
    var fullBackupSize = new FileInfo(lastFullBackup).Length;
    var incrementalBackupsSize = files.Skip(fullBackupIndex + 1).Sum(f => new FileInfo(f).Length);

    // now we need to decide whatever to do a full or incremental backup, doing incremental backups stop 
    // making sense if they will take more space than the full backup. Our cutoff point is when it passes to 50%
    // size of the full backup.
    // If full backup size is 1 GB, and we have 25 incrmeental backups that are 600 MB in size, we need to transfer
    // 1.6 GB to restore. If we generate a new full backup, we'll only need to transfer 1 GB to restore.

    if (incrementalBackupsSize / 2 > fullBackupSize)
    {
        DoFullBackup(index, term, allowFurtherModifications);
        return;
    }

    DeleteOldSnapshots(files.Take(fullBackupIndex - 1));// delete snapshots older than the current full backup

    var incrementalBackup = new IncrementalBackup();
    incrementalBackup.ToFile(_storageEnvironment,
        Path.Combine(_storageEnvironment.Options.BasePath, string.Format("Inc-{0:D19}-{1:D19}.Snapshot", index, term)),
        infoNotify: Console.WriteLine,
        backupStarted: allowFurtherModifications.Set);
}

private void DoFullBackup(long index, long term, ManualResetEventSlim allowFurtherModifications)
{
    var snapshotsToDelete = Directory.GetFiles(_storageEnvironment.Options.BasePath, "*.Snapshot");

    var fullBackup = new FullBackup();
    fullBackup.ToFile(_storageEnvironment,
        Path.Combine(_storageEnvironment.Options.BasePath, string.Format("Full-{0:D19}-{1:D19}.Snapshot", index, term)),
        infoNotify: Console.WriteLine,
        backupStarted: allowFurtherModifications.Set
        );

    DeleteOldSnapshots(snapshotsToDelete);
}

private static void DeleteOldSnapshots(IEnumerable<string> snapshotsToDelete)
{
    foreach (var snapshot in snapshotsToDelete)
    {
        try
        {
            File.Delete(snapshot);
        }
        catch (Exception)
        {
            // we ignore snapshots we can't delete, they are expected if we are concurrently writing
            // the snapshot and creating a new one. We'll get them the next time.
        }
    }
}

Basically, we need to strike a balance between full and incremental backups. We do that by first taking a full backup, and then starting to take incremental backups until our incremental backups takes more than 50% of the full backup, at which point we are probably better off doing another full backup. Note that we use the event of a full backup to clear the old incremental and full backup files.

And with that, we can move to actually sending the snapshot over the wire. This is exposed by the GetSnapshotWriter() method. This just shell all the responsibility to the SnapshotWriter:

public ISnapshotWriter GetSnapshotWriter()
{
    return new SnapshotWriter(this);
}

public class SnapshotWriter : ISnapshotWriter
{
    private readonly KeyValueStateMachine _parent;

    private List<FileStream> _files = new List<FileStream>();

    public SnapshotWriter(KeyValueStateMachine parent)
    {
        _parent = parent;
        var files = Directory.GetFiles(_parent._storageEnvironment.Options.BasePath, "*.Snapshot");
        var fullBackupIndex = GetFullBackupIndex(files);

        if (fullBackupIndex == -1)
            throw new InvalidOperationException("Could not find a full backup file to start the snapshot writing");

        var last = Path.GetFileNameWithoutExtension(files[files.Length-1]);
        Debug.Assert(last != null);
        var parts = last.Split('-');
        if(parts.Length != 3)
            throw new InvalidOperationException("Invalid snapshot file name " + files[files.Length - 1] + ", could not figure out index & term");

        Index = long.Parse(parts[1]);
        Term = long.Parse(parts[2]);

        for (int i = fullBackupIndex; i < files.Length; i++)
        {
            _files.Add(File.OpenRead(files[i]));
        }
    }

    public void Dispose()
    {
        foreach (var file in _files)
        {
            file.Dispose();
        }
    }

    public long Index { get; private set; }
    public long Term { get; private set; }
    public void WriteSnapshot(Stream stream)
    {
        var writer = new BinaryWriter(stream);
        writer.Write(_files.Count);
        foreach (var file in _files)
        {
            writer.Write(file.Name);
writer.Write(file.Length); writer.Flush(); file.CopyTo(stream); } } }

What is going on here? We get the snapshot files, and find the latest full backup, then we open all the files that we’ll need for the snapshot (the last full backup and everything afterward). We need to open them in the constructor to lock them for deletion by the CreateSnapshot() method.

Then we just concatenate them all and send them over the wire. And getting them? That is pretty easy as well:

public void ApplySnapshot(long term, long index, Stream stream)
{
    var basePath = _storageEnvironment.Options.BasePath;
    _storageEnvironment.Dispose();

    foreach (var file in Directory.EnumerateFiles(basePath))
    {
        File.Delete(file);
    }

    var files = new List<string>();

    var buffer = new byte[1024*16];
    var reader = new BinaryReader(stream);
    var filesCount = reader.ReadInt32();
    if (filesCount == 0)
        throw new InvalidOperationException("Snapshot cannot contain zero files");
    for (int i = 0; i < filesCount; i++)
    {
        var name = reader.ReadString();
        files.Add(name);
        var len = reader.ReadInt64();
        using (var file = File.Create(Path.Combine(basePath, name)))
        {
            file.SetLength(len);
            var totalFileRead = 0;
            while (totalFileRead < len)
            {
                var read = stream.Read(buffer, 0, (int) Math.Min(buffer.Length, len - totalFileRead));
                if (read == 0)
                    throw new EndOfStreamException();
                totalFileRead += read;
                file.Write(buffer, 0, read);
            }
        }
    }
            
    new FullBackup().Restore(Path.Combine(basePath, files[0]), basePath);

    var options = StorageEnvironmentOptions.ForPath(basePath);
    options.IncrementalBackupEnabled = true;
    //TODO: Copy any other customizations that might have happened on the options

    new IncrementalBackup().Restore(options, files.Skip(1));

    _storageEnvironment = new StorageEnvironment(options);

    using (var tx = _storageEnvironment.NewTransaction(TransactionFlags.ReadWrite))
    {
        var metadata = tx.ReadTree("$metadata");
        metadata.Add("last-index", EndianBitConverter.Little.GetBytes(index));
        LastAppliedIndex = index;
        tx.Commit();
    }
}

Unpack the snapshots from the stream, then first apply a full backup, then all the incremental backups. Make sure to update the last applied index, and we are set Smile.