Ayende @ Rahien

Refunds available at head office

Building a managed persistent, transactional, queue

When one approaches building transactional systems, there are two main ways one can approach them.

  • Transaction log
  • Append only

In both cases, we rely on very important property, fsync. fsync is actually a unix call, which flushes all data to the actual device. In essence, this means “write the the actual hardware and don’t return until the hardware confirmed that the data was saved”. In Windows, the equivalent call is: FlushFileBuffers, or WriteThrough. WriteThrough is better if you need to call FlushFileBuffers every single time, while FlushFileBuffers is significantly faster if you only need to call it once in a while.

FileStream.Flush(true) in .NET 4.0 translate to FlushFileBuffers.

Transaction log systems tend to be more complex than append only systems, but append only systems use more space. Space tend to be pretty cheap, so that is a good tradeoff, I think.

Given that, let us see how we can consider a managed transactional persistent queue. One interesting aspect is that append only, just like immutable data structures, is really not friendly for things like queues. The problem is that adding an item to the queue requires modification to the entire queue, which result in a large number of memory allocations / writes to disk.

The functional crowd has solved the problem long ago, it seems. In essence, a functional queue is composed of two immutable lists, one for the front of the queue and the other for the back of the queue. You add things to the back of the queue, and pop things from the front of the queue. When the front of the queue is empty, you set the front of the queue to be the reversed back of the queue and clear the back. The link should make it clear how this works.

Our first task is actually implementing the list. Since we really only need it to be a stack, I won’t bother with list functionality and just implement a very simple stack. With that, we can implement the actual stack:

public class Stack
{
    private readonly Stream reader;
    private readonly Stream writer;
    private StackItem current;
    private readonly BinaryWriterWith7BitEncoding binaryWriter;
    private readonly BinaryReaderWith7BitEncoding binaryReader;
    private readonly List<StackItem> unwritten = new List<StackItem>();

    private class StackItem
    {
        public long Position { get; set; }
        public readonly StackItem NextItem;
        public readonly long Value;
        public readonly long? Next;

        public StackItem(long value, StackItem nextItem)
        {
            Value = value;
            NextItem = nextItem;
        }

        public StackItem(long value, long? next)
        {
            Value = value;
            Next = next;
        }
    }

    public long? CurrentPosition
    {
        get
        {
            return current == null ? (long?)null : current.Position;
        }
    }

    public Stack(Stream reader, Stream writer, StartMode mode)
    {
        this.reader = reader;
        this.writer = writer;

        binaryReader = new BinaryReaderWith7BitEncoding(reader);
        binaryWriter = new BinaryWriterWith7BitEncoding(writer);

        if (mode != StartMode.Open)
            return;
        current = ReadStackItem(reader.Position);
    }

    public void Push(byte[] data)
    {
        var pos = writer.Position;
        binaryWriter.Write7BitEncodedInt(data.Length);
        binaryWriter.Write(data);

        PushInternal(pos);
    }

    private void PushInternal(long pos)
    {
        current = new StackItem(pos, current);
        unwritten.Add(current);
    }

    public byte[] Pop()
    {
        var result = PopInternal(ref current);

        if (result == null)
            return null;

        reader.Position = result.Value;
        var size = binaryReader.Read7BitEncodedInt();
        var bytes = binaryReader.ReadBytes(size);
        return bytes;
    }

    private long? PopInternal(ref StackItem item)
    {
        if (item == null)
            return null;
        unwritten.Remove(item);
        var result = item.Value;
            
        if (item.NextItem != null)
            item = item.NextItem;
        else if (item.Next != null)
            item = ReadStackItem(item.Next.Value);
        else
            item = null;

        return result;
    }

    public void Flush()
    {
        foreach (var stackItem in unwritten)
        {
            stackItem.Position = writer.Position;
            binaryWriter.WriteBitEncodedNullableInt64(stackItem.Value);
            binaryWriter.WriteBitEncodedNullableInt64(stackItem.NextItem != null ? stackItem.NextItem.Position : stackItem.Next);
        }
        unwritten.Clear();
    }

    private StackItem ReadStackItem(long position)
    {
        reader.Position = position;
        return new StackItem(
            binaryReader.ReadBitEncodedNullableInt64().Value,
            binaryReader.ReadBitEncodedNullableInt64()
            )
        {
            Position = position
        };
    }

    public Stack Reverse()
    {
        var stack = new Stack(reader, writer, StartMode.Create);
        var item = current;

        while(item != null)
        {
            var value = PopInternal(ref item);
            if(value!=null)
                stack.PushInternal(value.Value);
        }

        return stack;
    }
}

The code make several assumptions:

  • The stream is using WriteThrough, so once a write was completed, it is saved to the disk.
  • It is not our responsibility to keep track of things like the current position, this is handled by higher level code.
  • We are allowed to jump around on the reader, but the writer is only doing appends.

Given the stack behavior, we can now implement the queue:

public class Queue
{
    private readonly Stream reader;
    private readonly Stream writer;
    private Stack front;
    private Stack back;
    private readonly BinaryReaderWith7BitEncoding binaryReader;
    private readonly BinaryWriterWith7BitEncoding binaryWriter;

    public Queue(Stream reader, Stream writer, StartMode mode)
    {
        this.reader = reader;
        this.writer = writer;
        binaryReader = new BinaryReaderWith7BitEncoding(reader);
        binaryWriter = new BinaryWriterWith7BitEncoding(writer);

        switch (mode)
        {
            case StartMode.Open:
                ReadStacks();
                break;
            case StartMode.Create:
                InitializeEmptyStacks();
                break;
        }
    }

    private void InitializeEmptyStacks()
    {
        front = new Stack(reader, writer, StartMode.Create);
        back = new Stack(reader, writer, StartMode.Create);
    }

    private void ReadStacks()
    {
        var frontPos = binaryReader.ReadBitEncodedNullableInt64();
        var backPos = binaryReader.ReadBitEncodedNullableInt64();
        if (frontPos != null)
        {
            reader.Position = frontPos.Value;
            front = new Stack(reader, writer, StartMode.Open);
        }
        else
        {
            front = new Stack(reader, writer, StartMode.Create);
        }
        if (backPos != null)
        {
            reader.Position = backPos.Value;
            back = new Stack(reader, writer, StartMode.Open);
        }
        else
        {
            back = new Stack(reader, writer, StartMode.Create);
        }
    }

    public void Enqueue(byte[] data)
    {
        back.Push(data);
    }

    public byte[] Dequeue()
    {
        var result = front.Pop();
        if (result != null)
            return result;

        front = back.Reverse();
        back = new Stack(reader, writer, StartMode.Create);
        return front.Pop();
    }

    public void Flush()
    {
        front.Flush();
        back.Flush();

        QueuePosition = writer.Position;

        binaryWriter.WriteBitEncodedNullableInt64(front.CurrentPosition);
        binaryWriter.WriteBitEncodedNullableInt64(back.CurrentPosition);
    }

    public long QueuePosition { get; private set; }
}

Now, just to make things interesting, let us see what it actually means:

var sp = Stopwatch.StartNew();
using (var writer = new FileStream("data",
    FileMode.OpenOrCreate,
    FileAccess.ReadWrite,
    FileShare.Delete | FileShare.Read,
    16 * 1024, 
    FileOptions.SequentialScan))
using (var reader = new FileStream("data", 
    FileMode.Open, 
    FileAccess.Read, 
    FileShare.ReadWrite | FileShare.Delete, 
    16 * 1024, 
    FileOptions.RandomAccess))
{
    Queue queue = new Queue(reader, writer, StartMode.Create);

    var bytes = new byte[1024*7];
    new Random().NextBytes(bytes);
    for (int i = 0; i < 100000; i++)
    {
        queue.Enqueue(bytes);
    }
    queue.Flush();
    writer.Flush(true);
}
Console.WriteLine(sp.ElapsedMilliseconds);

And this completes is a bit over 14 seconds, or over seven thousands (pretty big) items per second.

Comments

Rafal
06/17/2010 12:41 PM by
Rafal
  1. Is it multithreaded? What about two threads writing to the same file at once?

  2. What about the size of queue file? Won't it be too big?

Tim
06/17/2010 01:24 PM by
Tim

Verry interesting... I've been reading this article a couple of times now but I keep wondering:

The two binary reader and writer classes you are new'ing up several times in your code: where's the code for the implementation of these (BinaryReaderWith7BitEncoding & BinaryWriterWith7BitEncoding)?

Reason I'm asking is that I'm uncertain what a method like WriteBitEncodedNullableInt64(long? value) would write to disk.

Corey
06/17/2010 01:29 PM by
Corey

I always love your posts that explain how to do something but don't give context as to why you are doing it. Where are you going with this?

William
06/17/2010 02:14 PM by
William

@Cory

Not sure myself but maybe RavenDB is going to have a Transaction Log support

Ayende Rahien
06/17/2010 02:35 PM by
Ayende Rahien

Rafal,

1) no, this is not multi threaded. In general, both transaction log & append only need to ensure consistent flushes, so they serialize writes to the transaction log / append only stream.

2) It is actually pretty efficient. The perf test produced a file that was 701MB in size, with the raw data being 683 MB of that.

We are talking about 3% overhead, which is more than reasonable, I think.

Ayende Rahien
06/17/2010 02:36 PM by
Ayende Rahien

Tim,

The binary reader / writer just inherit BinaryWriter and BinaryReader and does 7 bit encoding, that is all.

You can look how they do that in Reflector, BinaryWriter.Write7BitEncodedInt

Ayende Rahien
06/17/2010 02:36 PM by
Ayende Rahien

Corey & William,

This is useful for the managed storage option for Raven.

Ayende Rahien
06/17/2010 02:37 PM by
Ayende Rahien

Corey & William,

This is useful for the managed storage option for Raven.

William,

I think you meant, RAvenDB is going to have Append Only support

Tim
06/17/2010 02:42 PM by
Tim

Thanks for the reply. I've found the answer to writing integers with 7 bit encoding... but still the question remains:

"Reason I'm asking is that I'm uncertain what a method like WriteBitEncodedNullableInt64(long? value) would write to disk."

What do you write if the value is null?

Ayende Rahien
06/17/2010 02:54 PM by
Ayende Rahien

Tim,

A special value is written if the value is null.

MikeS
06/18/2010 11:56 AM by
MikeS

Nice, I like the speed :-)

I'm curious whether you considered versioning as an alternative solution? I believe Interbase was the first RDBMS systems to introduce it - they call it Multi-Generational Architecture. It allow readers not to block and writers not to block readers. I don't know whether it would apply to the requirements for which you built this solution, but it's an interesting alternative to logs and append-only WRT implementing transactional systems.

Ayende Rahien
06/18/2010 12:35 PM by
Ayende Rahien

Mike,

Any more information about that?

Ayende Rahien
06/18/2010 12:36 PM by
Ayende Rahien

FWIW,

This method means that readers don't block writers and writers don't block readers, but writers blocks writers.

MikeS
06/18/2010 01:31 PM by
MikeS

Ayende,

True, writers block writers but that's a lot better than writers blocking readers as well.

en.wikipedia.org/.../InterBase
en.wikipedia.org/.../Multiversionconcurrencyc...

It's one of the things that attracted me to Interbase years ago and then Firebird after Borland open-sourced the code. Not requiring locking allows cheap serializable isolation. There has to be some garbage collection for versions that no longer have active transactions.

Oracle added it in v3 and SQL Server finally got around to it in 2005.

MikeS
06/18/2010 01:36 PM by
MikeS

Correction: I should have said snapshot isolation, not serializable isolation.

Ayende Rahien
06/18/2010 02:04 PM by
Ayende Rahien

MikeS,

Yep, that is pretty much what is going on there.

In the code here, we pretty much have this situation. Readers & Writers operate independently from one another.

We don't have to worry about a thing.

The actual name for that is MVCC (multi version concurrency control), append only system that support more than a single thread are naturally MVCC and support this natively

MikeS
06/18/2010 02:34 PM by
MikeS

So, reading between the lines, does this mean that RavenDB supports or will soon support highly performant snapshot isolation? That would be very cool - and in line with Couch & Mongo.

Demis Bellot
06/23/2010 02:02 AM by
Demis Bellot

Very interesting perspective indeed.

Out of curiosity, why would you go with the dual 'front and back' lists approach over say a double-linked list e.g. C# LinkedList collection datatype?

I would've thought that a double-linked list is the perfect collection type for adding items to the head and tail of a list in constant time.

Ayende Rahien
06/23/2010 10:27 AM by
Ayende Rahien

Demis,

The problem is simple, doubly linked list requires you to have mutable data structure.

I am trying to do this in an immutable, append only, manner

Comments have been closed on this topic.