Building a managed persistent, transactional, queue

time to read 14 min | 2766 words

When one approaches building transactional systems, there are two main ways one can approach them.

  • Transaction log
  • Append only

In both cases, we rely on very important property, fsync. fsync is actually a unix call, which flushes all data to the actual device. In essence, this means “write the the actual hardware and don’t return until the hardware confirmed that the data was saved”. In Windows, the equivalent call is: FlushFileBuffers, or WriteThrough. WriteThrough is better if you need to call FlushFileBuffers every single time, while FlushFileBuffers is significantly faster if you only need to call it once in a while.

FileStream.Flush(true) in .NET 4.0 translate to FlushFileBuffers.

Transaction log systems tend to be more complex than append only systems, but append only systems use more space. Space tend to be pretty cheap, so that is a good tradeoff, I think.

Given that, let us see how we can consider a managed transactional persistent queue. One interesting aspect is that append only, just like immutable data structures, is really not friendly for things like queues. The problem is that adding an item to the queue requires modification to the entire queue, which result in a large number of memory allocations / writes to disk.

The functional crowd has solved the problem long ago, it seems. In essence, a functional queue is composed of two immutable lists, one for the front of the queue and the other for the back of the queue. You add things to the back of the queue, and pop things from the front of the queue. When the front of the queue is empty, you set the front of the queue to be the reversed back of the queue and clear the back. The link should make it clear how this works.

Our first task is actually implementing the list. Since we really only need it to be a stack, I won’t bother with list functionality and just implement a very simple stack. With that, we can implement the actual stack:

public class Stack
{
    private readonly Stream reader;
    private readonly Stream writer;
    private StackItem current;
    private readonly BinaryWriterWith7BitEncoding binaryWriter;
    private readonly BinaryReaderWith7BitEncoding binaryReader;
    private readonly List<StackItem> unwritten = new List<StackItem>();

    private class StackItem
    {
        public long Position { get; set; }
        public readonly StackItem NextItem;
        public readonly long Value;
        public readonly long? Next;

        public StackItem(long value, StackItem nextItem)
        {
            Value = value;
            NextItem = nextItem;
        }

        public StackItem(long value, long? next)
        {
            Value = value;
            Next = next;
        }
    }

    public long? CurrentPosition
    {
        get
        {
            return current == null ? (long?)null : current.Position;
        }
    }

    public Stack(Stream reader, Stream writer, StartMode mode)
    {
        this.reader = reader;
        this.writer = writer;

        binaryReader = new BinaryReaderWith7BitEncoding(reader);
        binaryWriter = new BinaryWriterWith7BitEncoding(writer);

        if (mode != StartMode.Open)
            return;
        current = ReadStackItem(reader.Position);
    }

    public void Push(byte[] data)
    {
        var pos = writer.Position;
        binaryWriter.Write7BitEncodedInt(data.Length);
        binaryWriter.Write(data);

        PushInternal(pos);
    }

    private void PushInternal(long pos)
    {
        current = new StackItem(pos, current);
        unwritten.Add(current);
    }

    public byte[] Pop()
    {
        var result = PopInternal(ref current);

        if (result == null)
            return null;

        reader.Position = result.Value;
        var size = binaryReader.Read7BitEncodedInt();
        var bytes = binaryReader.ReadBytes(size);
        return bytes;
    }

    private long? PopInternal(ref StackItem item)
    {
        if (item == null)
            return null;
        unwritten.Remove(item);
        var result = item.Value;
            
        if (item.NextItem != null)
            item = item.NextItem;
        else if (item.Next != null)
            item = ReadStackItem(item.Next.Value);
        else
            item = null;

        return result;
    }

    public void Flush()
    {
        foreach (var stackItem in unwritten)
        {
            stackItem.Position = writer.Position;
            binaryWriter.WriteBitEncodedNullableInt64(stackItem.Value);
            binaryWriter.WriteBitEncodedNullableInt64(stackItem.NextItem != null ? stackItem.NextItem.Position : stackItem.Next);
        }
        unwritten.Clear();
    }

    private StackItem ReadStackItem(long position)
    {
        reader.Position = position;
        return new StackItem(
            binaryReader.ReadBitEncodedNullableInt64().Value,
            binaryReader.ReadBitEncodedNullableInt64()
            )
        {
            Position = position
        };
    }

    public Stack Reverse()
    {
        var stack = new Stack(reader, writer, StartMode.Create);
        var item = current;

        while(item != null)
        {
            var value = PopInternal(ref item);
            if(value!=null)
                stack.PushInternal(value.Value);
        }

        return stack;
    }
}

The code make several assumptions:

  • The stream is using WriteThrough, so once a write was completed, it is saved to the disk.
  • It is not our responsibility to keep track of things like the current position, this is handled by higher level code.
  • We are allowed to jump around on the reader, but the writer is only doing appends.

Given the stack behavior, we can now implement the queue:

public class Queue
{
    private readonly Stream reader;
    private readonly Stream writer;
    private Stack front;
    private Stack back;
    private readonly BinaryReaderWith7BitEncoding binaryReader;
    private readonly BinaryWriterWith7BitEncoding binaryWriter;

    public Queue(Stream reader, Stream writer, StartMode mode)
    {
        this.reader = reader;
        this.writer = writer;
        binaryReader = new BinaryReaderWith7BitEncoding(reader);
        binaryWriter = new BinaryWriterWith7BitEncoding(writer);

        switch (mode)
        {
            case StartMode.Open:
                ReadStacks();
                break;
            case StartMode.Create:
                InitializeEmptyStacks();
                break;
        }
    }

    private void InitializeEmptyStacks()
    {
        front = new Stack(reader, writer, StartMode.Create);
        back = new Stack(reader, writer, StartMode.Create);
    }

    private void ReadStacks()
    {
        var frontPos = binaryReader.ReadBitEncodedNullableInt64();
        var backPos = binaryReader.ReadBitEncodedNullableInt64();
        if (frontPos != null)
        {
            reader.Position = frontPos.Value;
            front = new Stack(reader, writer, StartMode.Open);
        }
        else
        {
            front = new Stack(reader, writer, StartMode.Create);
        }
        if (backPos != null)
        {
            reader.Position = backPos.Value;
            back = new Stack(reader, writer, StartMode.Open);
        }
        else
        {
            back = new Stack(reader, writer, StartMode.Create);
        }
    }

    public void Enqueue(byte[] data)
    {
        back.Push(data);
    }

    public byte[] Dequeue()
    {
        var result = front.Pop();
        if (result != null)
            return result;

        front = back.Reverse();
        back = new Stack(reader, writer, StartMode.Create);
        return front.Pop();
    }

    public void Flush()
    {
        front.Flush();
        back.Flush();

        QueuePosition = writer.Position;

        binaryWriter.WriteBitEncodedNullableInt64(front.CurrentPosition);
        binaryWriter.WriteBitEncodedNullableInt64(back.CurrentPosition);
    }

    public long QueuePosition { get; private set; }
}

Now, just to make things interesting, let us see what it actually means:

var sp = Stopwatch.StartNew();
using (var writer = new FileStream("data",
    FileMode.OpenOrCreate,
    FileAccess.ReadWrite,
    FileShare.Delete | FileShare.Read,
    16 * 1024, 
    FileOptions.SequentialScan))
using (var reader = new FileStream("data", 
    FileMode.Open, 
    FileAccess.Read, 
    FileShare.ReadWrite | FileShare.Delete, 
    16 * 1024, 
    FileOptions.RandomAccess))
{
    Queue queue = new Queue(reader, writer, StartMode.Create);

    var bytes = new byte[1024*7];
    new Random().NextBytes(bytes);
    for (int i = 0; i < 100000; i++)
    {
        queue.Enqueue(bytes);
    }
    queue.Flush();
    writer.Flush(true);
}
Console.WriteLine(sp.ElapsedMilliseconds);

And this completes is a bit over 14 seconds, or over seven thousands (pretty big) items per second.