Rhino.Queues.Storage.Disk

time to read 7 min | 1272 words

The first thing you should know, the code for this is here: https://rhino-tools.svn.sourceforge.net/svnroot/rhino-tools/experiments/Rhino.Queues.Storage.Disk

After my disastrous attempt to get an off the shelf embedded storage that can actually handle multiple threads with a half way decent performance, I decided that I really have to write my own.

I am pointing this out to all the people who think that my first instinct is to write my own.  Here is the API that it has:

using (var queue = new PersistentQueue(path))
{
	using (var session = queue.OpenSession())
	{
		session.Enqueue(new byte[] { 1, 2, 3, 4 });
		session.Flush();
	}

	using (var session = queue.OpenSession())
	{
		CollectionAssert.AreEqual(new byte[] { 1, 2, 3, 4 }, session.Dequeue());
		session.Flush();
	}
}

If it reminds you of something, that is intentional. :-)

Persistent Queue is a singleton, which allow you to create sessions, which are thread local and contain the API to Enqueue and Dequeue from the queue. You can see the Flush() call there. This call persist the actual enqueuing or dequeuing. Our changes will revert if we will not call it.

On of the design decisions that I wanted for this project was transaction support. That is, it should support ACID. This is not a trivial matter to decide to implement, but I decided that I wanted to try it.

Let me describe how it works. The persistent queue is using three types of files:

  • Transaction log
  • Data files
  • Lock file

The data file is a simply where I place the data from the user, nothing more. Basically, here is how it looks:

image

There are several optimizations in the way that I write to the data file. First, writing is always done in sequential manner, and no editing of the file is ever done. That is, once something is on the file, it is never touched (except for read, of course). This allow to maintain a very high performance when writing to file. Combining writes and deciding when to use async I/O vs. sync I/O also has a significant implication.

Of course, because I never edit the file, I need some way to deal with data overflow. Once the file exceed a predefined limit, I stop writing to it and start writing to a new file. Once all the items has been read from a file, and no more can be written to it, I delete it. This is a very simple strategy for managing the actual data. In fact, we use the current file size to size the next file, hopefully avoiding fragmentation this way, by simple heuristics.

Of course, writing the data to disk is only half the problem. We need to manage that as well. For that, I have the transaction log. Because I have an extremely simple set of operations (Enqueue and Dequeue), it is very easy to deal with it. Here is a view of the transaction log:

  image

The model is simple, each transaction is composed of a set of operations, and each operation contains the data in . When we commit a transaction, we wait for all write operation to complete, and then we append this data to the transaction log. The fact that we are only ever append to the transaction log also pay off in terms of how fast sequential writes are. Periodically and on graceful shutdown, we trim the transaction log, leaving only Enqueue operations there.

This, in turn, allow us to restore ourselves in the case of an error.

All writes to the disk are done with WriteThrough enabled, so we touch the platter, and writing to the transaction log is protected using a strategy similar to the one described here, so it can either succeed or fail.

A lot of the complexity is simplified by serializing all writes to the disk. This has some implications in terms of the concurrency, but it is alleviated by using async I/O when it is appropriated. Overall performance is actually better in this scenario, because we are reduce the amount of seeks the disk has to do.

Let me try to work through the ACID components and see if this works.

Atomicity

We should either complete succeed or completely fail. Let us assume that we have an error midway through a transaction. We have several options:

  • The system has already written the transaction an all its operations to the disk, in which case we consider this a successful transaction.
  • No data was written to the transaction log about all the entries added / removed in this transaction, in which case, the transaction never actually occurred, so we are back to completely failed.
  • The most problematic part is handling partial transaction data written to the disk. Note that we have a count on the transaction log telling us how many operations to expect? We are using this to verify that the transaction has been successfully written to the disk. If we find a partially complete transaction, we ignore it and fix the transaction log. As far as we are concerned, the transaction never happened.

Since we use WriteThrough, we can be sure that when we finish a write, it is on the disk itself, and we will not loss it in case of a system crash.

Note that there is one side affect of failed transactions, we do not recover disk space that went for data in lost transactions. This mean that if you written a 1MB chunk, then rolled the transaction back, that data file will not reclaim that MB of space that was lost. This is intentional, since it appears that it is actually more costly to perform the bookkeeping than merely lose the space temporarily.

Consistency

Will work with the guarantees that we have in place for atomicity.

Isolation

We only add the newly enqueued items to the global list (and the transaction log) when the transaction is committed. Dequeuing items, however, remove them from the global list and put them on hold, if the transaction is rolled back they return to the front of the queue, if the transaction is committed, their removal is logged to the transaction log.

Durability

All data is written to disk without buffers, so when the data write is complete, so we can ensure that a system failure will not wipe it.

And that is how I implemented ACID for a system that has only two actions. But there was another challenge.

Performance

As you probably noticed, I put a lot of thought into performance. In particular, write batching and read ahead were important factors in significantly improving the performance of the system.

Right now, the situation is good enough that I include the Performance Tests as part of the Unit Tests of the application. Those unit tests include several tests that read and write millions of items, and the total test suite completes in ~80 seconds.

Below you can the profiler output of such a run. There are two things to note here, first, the amount of time spent calling to disk, and the number of calls that were made to the disk. I have put a lot of effort in minimizing the number of calls as much as possible, and it shows in the tests performance.

image

Scale

I tested this with 5,000 items of 512Kb - 1Mb in size, both read and write. The test completed within 5 minutes, giving us a roughly 12 MB/s processing time for a single queue.

I think that this is enough for now, feel free to go through the code and rip it to pieces :-)