Building a managed persistent, transactional, queue
When one approaches building transactional systems, there are two main ways one can approach them.
- Transaction log
- Append only
In both cases, we rely on very important property, fsync. fsync is actually a unix call, which flushes all data to the actual device. In essence, this means “write the the actual hardware and don’t return until the hardware confirmed that the data was saved”. In Windows, the equivalent call is: FlushFileBuffers, or WriteThrough. WriteThrough is better if you need to call FlushFileBuffers every single time, while FlushFileBuffers is significantly faster if you only need to call it once in a while.
FileStream.Flush(true) in .NET 4.0 translate to FlushFileBuffers.
Transaction log systems tend to be more complex than append only systems, but append only systems use more space. Space tend to be pretty cheap, so that is a good tradeoff, I think.
Given that, let us see how we can consider a managed transactional persistent queue. One interesting aspect is that append only, just like immutable data structures, is really not friendly for things like queues. The problem is that adding an item to the queue requires modification to the entire queue, which result in a large number of memory allocations / writes to disk.
The functional crowd has solved the problem long ago, it seems. In essence, a functional queue is composed of two immutable lists, one for the front of the queue and the other for the back of the queue. You add things to the back of the queue, and pop things from the front of the queue. When the front of the queue is empty, you set the front of the queue to be the reversed back of the queue and clear the back. The link should make it clear how this works.
Our first task is actually implementing the list. Since we really only need it to be a stack, I won’t bother with list functionality and just implement a very simple stack. With that, we can implement the actual stack:
public class Stack { private readonly Stream reader; private readonly Stream writer; private StackItem current; private readonly BinaryWriterWith7BitEncoding binaryWriter; private readonly BinaryReaderWith7BitEncoding binaryReader; private readonly List<StackItem> unwritten = new List<StackItem>(); private class StackItem { public long Position { get; set; } public readonly StackItem NextItem; public readonly long Value; public readonly long? Next; public StackItem(long value, StackItem nextItem) { Value = value; NextItem = nextItem; } public StackItem(long value, long? next) { Value = value; Next = next; } } public long? CurrentPosition { get { return current == null ? (long?)null : current.Position; } } public Stack(Stream reader, Stream writer, StartMode mode) { this.reader = reader; this.writer = writer; binaryReader = new BinaryReaderWith7BitEncoding(reader); binaryWriter = new BinaryWriterWith7BitEncoding(writer); if (mode != StartMode.Open) return; current = ReadStackItem(reader.Position); } public void Push(byte[] data) { var pos = writer.Position; binaryWriter.Write7BitEncodedInt(data.Length); binaryWriter.Write(data); PushInternal(pos); } private void PushInternal(long pos) { current = new StackItem(pos, current); unwritten.Add(current); } public byte[] Pop() { var result = PopInternal(ref current); if (result == null) return null; reader.Position = result.Value; var size = binaryReader.Read7BitEncodedInt(); var bytes = binaryReader.ReadBytes(size); return bytes; } private long? PopInternal(ref StackItem item) { if (item == null) return null; unwritten.Remove(item); var result = item.Value; if (item.NextItem != null) item = item.NextItem; else if (item.Next != null) item = ReadStackItem(item.Next.Value); else item = null; return result; } public void Flush() { foreach (var stackItem in unwritten) { stackItem.Position = writer.Position; binaryWriter.WriteBitEncodedNullableInt64(stackItem.Value); binaryWriter.WriteBitEncodedNullableInt64(stackItem.NextItem != null ? stackItem.NextItem.Position : stackItem.Next); } unwritten.Clear(); } private StackItem ReadStackItem(long position) { reader.Position = position; return new StackItem( binaryReader.ReadBitEncodedNullableInt64().Value, binaryReader.ReadBitEncodedNullableInt64() ) { Position = position }; } public Stack Reverse() { var stack = new Stack(reader, writer, StartMode.Create); var item = current; while(item != null) { var value = PopInternal(ref item); if(value!=null) stack.PushInternal(value.Value); } return stack; } }
The code make several assumptions:
- The stream is using WriteThrough, so once a write was completed, it is saved to the disk.
- It is not our responsibility to keep track of things like the current position, this is handled by higher level code.
- We are allowed to jump around on the reader, but the writer is only doing appends.
Given the stack behavior, we can now implement the queue:
public class Queue { private readonly Stream reader; private readonly Stream writer; private Stack front; private Stack back; private readonly BinaryReaderWith7BitEncoding binaryReader; private readonly BinaryWriterWith7BitEncoding binaryWriter; public Queue(Stream reader, Stream writer, StartMode mode) { this.reader = reader; this.writer = writer; binaryReader = new BinaryReaderWith7BitEncoding(reader); binaryWriter = new BinaryWriterWith7BitEncoding(writer); switch (mode) { case StartMode.Open: ReadStacks(); break; case StartMode.Create: InitializeEmptyStacks(); break; } } private void InitializeEmptyStacks() { front = new Stack(reader, writer, StartMode.Create); back = new Stack(reader, writer, StartMode.Create); } private void ReadStacks() { var frontPos = binaryReader.ReadBitEncodedNullableInt64(); var backPos = binaryReader.ReadBitEncodedNullableInt64(); if (frontPos != null) { reader.Position = frontPos.Value; front = new Stack(reader, writer, StartMode.Open); } else { front = new Stack(reader, writer, StartMode.Create); } if (backPos != null) { reader.Position = backPos.Value; back = new Stack(reader, writer, StartMode.Open); } else { back = new Stack(reader, writer, StartMode.Create); } } public void Enqueue(byte[] data) { back.Push(data); } public byte[] Dequeue() { var result = front.Pop(); if (result != null) return result; front = back.Reverse(); back = new Stack(reader, writer, StartMode.Create); return front.Pop(); } public void Flush() { front.Flush(); back.Flush(); QueuePosition = writer.Position; binaryWriter.WriteBitEncodedNullableInt64(front.CurrentPosition); binaryWriter.WriteBitEncodedNullableInt64(back.CurrentPosition); } public long QueuePosition { get; private set; } }
Now, just to make things interesting, let us see what it actually means:
var sp = Stopwatch.StartNew(); using (var writer = new FileStream("data", FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Delete | FileShare.Read, 16 * 1024, FileOptions.SequentialScan)) using (var reader = new FileStream("data", FileMode.Open, FileAccess.Read, FileShare.ReadWrite | FileShare.Delete, 16 * 1024, FileOptions.RandomAccess)) { Queue queue = new Queue(reader, writer, StartMode.Create); var bytes = new byte[1024*7]; new Random().NextBytes(bytes); for (int i = 0; i < 100000; i++) { queue.Enqueue(bytes); } queue.Flush(); writer.Flush(true); } Console.WriteLine(sp.ElapsedMilliseconds);
And this completes is a bit over 14 seconds, or over seven thousands (pretty big) items per second.
Comments
Is it multithreaded? What about two threads writing to the same file at once?
What about the size of queue file? Won't it be too big?
Verry interesting... I've been reading this article a couple of times now but I keep wondering:
The two binary reader and writer classes you are new'ing up several times in your code: where's the code for the implementation of these (BinaryReaderWith7BitEncoding & BinaryWriterWith7BitEncoding)?
Reason I'm asking is that I'm uncertain what a method like WriteBitEncodedNullableInt64(long? value) would write to disk.
I always love your posts that explain how to do something but don't give context as to why you are doing it. Where are you going with this?
@Cory
Not sure myself but maybe RavenDB is going to have a Transaction Log support
Rafal,
1) no, this is not multi threaded. In general, both transaction log & append only need to ensure consistent flushes, so they serialize writes to the transaction log / append only stream.
2) It is actually pretty efficient. The perf test produced a file that was 701MB in size, with the raw data being 683 MB of that.
We are talking about 3% overhead, which is more than reasonable, I think.
Tim,
The binary reader / writer just inherit BinaryWriter and BinaryReader and does 7 bit encoding, that is all.
You can look how they do that in Reflector, BinaryWriter.Write7BitEncodedInt
Corey & William,
This is useful for the managed storage option for Raven.
Corey & William,
This is useful for the managed storage option for Raven.
William,
I think you meant, RAvenDB is going to have Append Only support
Thanks for the reply. I've found the answer to writing integers with 7 bit encoding... but still the question remains:
"Reason I'm asking is that I'm uncertain what a method like WriteBitEncodedNullableInt64(long? value) would write to disk."
What do you write if the value is null?
Tim,
A special value is written if the value is null.
Nice, I like the speed :-)
I'm curious whether you considered versioning as an alternative solution? I believe Interbase was the first RDBMS systems to introduce it - they call it Multi-Generational Architecture. It allow readers not to block and writers not to block readers. I don't know whether it would apply to the requirements for which you built this solution, but it's an interesting alternative to logs and append-only WRT implementing transactional systems.
Mike,
Any more information about that?
FWIW,
This method means that readers don't block writers and writers don't block readers, but writers blocks writers.
Ayende,
True, writers block writers but that's a lot better than writers blocking readers as well.
en.wikipedia.org/.../InterBase
en.wikipedia.org/.../Multiversion_concurrency_c...
It's one of the things that attracted me to Interbase years ago and then Firebird after Borland open-sourced the code. Not requiring locking allows cheap serializable isolation. There has to be some garbage collection for versions that no longer have active transactions.
Oracle added it in v3 and SQL Server finally got around to it in 2005.
Correction: I should have said snapshot isolation, not serializable isolation.
MikeS,
Yep, that is pretty much what is going on there.
In the code here, we pretty much have this situation. Readers & Writers operate independently from one another.
We don't have to worry about a thing.
The actual name for that is MVCC (multi version concurrency control), append only system that support more than a single thread are naturally MVCC and support this natively
So, reading between the lines, does this mean that RavenDB supports or will soon support highly performant snapshot isolation? That would be very cool - and in line with Couch & Mongo.
MikeS,
Yes, it would
Very interesting perspective indeed.
Out of curiosity, why would you go with the dual 'front and back' lists approach over say a double-linked list e.g. C# LinkedList collection datatype?
I would've thought that a double-linked list is the perfect collection type for adding items to the head and tail of a list in constant time.
Demis,
The problem is simple, doubly linked list requires you to have mutable data structure.
I am trying to do this in an immutable, append only, manner
Highly unlikely
Comment preview