Rhino.Queues.Storage.Disk
The first thing you should know, the code for this is here: https://rhino-tools.svn.sourceforge.net/svnroot/rhino-tools/experiments/Rhino.Queues.Storage.Disk
After my disastrous attempt to get an off the shelf embedded storage that can actually handle multiple threads with a half way decent performance, I decided that I really have to write my own.
I am pointing this out to all the people who think that my first instinct is to write my own. Here is the API that it has:
using (var queue = new PersistentQueue(path)) { using (var session = queue.OpenSession()) { session.Enqueue(new byte[] { 1, 2, 3, 4 }); session.Flush(); } using (var session = queue.OpenSession()) { CollectionAssert.AreEqual(new byte[] { 1, 2, 3, 4 }, session.Dequeue()); session.Flush(); } }
If it reminds you of something, that is intentional. :-)
Persistent Queue is a singleton, which allow you to create sessions, which are thread local and contain the API to Enqueue and Dequeue from the queue. You can see the Flush() call there. This call persist the actual enqueuing or dequeuing. Our changes will revert if we will not call it.
On of the design decisions that I wanted for this project was transaction support. That is, it should support ACID. This is not a trivial matter to decide to implement, but I decided that I wanted to try it.
Let me describe how it works. The persistent queue is using three types of files:
- Transaction log
- Data files
- Lock file
The data file is a simply where I place the data from the user, nothing more. Basically, here is how it looks:
There are several optimizations in the way that I write to the data file. First, writing is always done in sequential manner, and no editing of the file is ever done. That is, once something is on the file, it is never touched (except for read, of course). This allow to maintain a very high performance when writing to file. Combining writes and deciding when to use async I/O vs. sync I/O also has a significant implication.
Of course, because I never edit the file, I need some way to deal with data overflow. Once the file exceed a predefined limit, I stop writing to it and start writing to a new file. Once all the items has been read from a file, and no more can be written to it, I delete it. This is a very simple strategy for managing the actual data. In fact, we use the current file size to size the next file, hopefully avoiding fragmentation this way, by simple heuristics.
Of course, writing the data to disk is only half the problem. We need to manage that as well. For that, I have the transaction log. Because I have an extremely simple set of operations (Enqueue and Dequeue), it is very easy to deal with it. Here is a view of the transaction log:
The model is simple, each transaction is composed of a set of operations, and each operation contains the data in . When we commit a transaction, we wait for all write operation to complete, and then we append this data to the transaction log. The fact that we are only ever append to the transaction log also pay off in terms of how fast sequential writes are. Periodically and on graceful shutdown, we trim the transaction log, leaving only Enqueue operations there.
This, in turn, allow us to restore ourselves in the case of an error.
All writes to the disk are done with WriteThrough enabled, so we touch the platter, and writing to the transaction log is protected using a strategy similar to the one described here, so it can either succeed or fail.
A lot of the complexity is simplified by serializing all writes to the disk. This has some implications in terms of the concurrency, but it is alleviated by using async I/O when it is appropriated. Overall performance is actually better in this scenario, because we are reduce the amount of seeks the disk has to do.
Let me try to work through the ACID components and see if this works.
Atomicity
We should either complete succeed or completely fail. Let us assume that we have an error midway through a transaction. We have several options:
- The system has already written the transaction an all its operations to the disk, in which case we consider this a successful transaction.
- No data was written to the transaction log about all the entries added / removed in this transaction, in which case, the transaction never actually occurred, so we are back to completely failed.
- The most problematic part is handling partial transaction data written to the disk. Note that we have a count on the transaction log telling us how many operations to expect? We are using this to verify that the transaction has been successfully written to the disk. If we find a partially complete transaction, we ignore it and fix the transaction log. As far as we are concerned, the transaction never happened.
Since we use WriteThrough, we can be sure that when we finish a write, it is on the disk itself, and we will not loss it in case of a system crash.
Note that there is one side affect of failed transactions, we do not recover disk space that went for data in lost transactions. This mean that if you written a 1MB chunk, then rolled the transaction back, that data file will not reclaim that MB of space that was lost. This is intentional, since it appears that it is actually more costly to perform the bookkeeping than merely lose the space temporarily.
Consistency
Will work with the guarantees that we have in place for atomicity.
Isolation
We only add the newly enqueued items to the global list (and the transaction log) when the transaction is committed. Dequeuing items, however, remove them from the global list and put them on hold, if the transaction is rolled back they return to the front of the queue, if the transaction is committed, their removal is logged to the transaction log.
Durability
All data is written to disk without buffers, so when the data write is complete, so we can ensure that a system failure will not wipe it.
And that is how I implemented ACID for a system that has only two actions. But there was another challenge.
Performance
As you probably noticed, I put a lot of thought into performance. In particular, write batching and read ahead were important factors in significantly improving the performance of the system.
Right now, the situation is good enough that I include the Performance Tests as part of the Unit Tests of the application. Those unit tests include several tests that read and write millions of items, and the total test suite completes in ~80 seconds.
Below you can the profiler output of such a run. There are two things to note here, first, the amount of time spent calling to disk, and the number of calls that were made to the disk. I have put a lot of effort in minimizing the number of calls as much as possible, and it shows in the tests performance.
Scale
I tested this with 5,000 items of 512Kb - 1Mb in size, both read and write. The test completed within 5 minutes, giving us a roughly 12 MB/s processing time for a single queue.
I think that this is enough for now, feel free to go through the code and rip it to pieces :-)
Comments
Cool stuff. Question - in your BDB experiments why did you use a BTree instead of their Queue table storage? Just curious. I've used it with great success from C++ and Java having 100's of threads access it.
From their docs...
"Data is stored in a queue as fixed-length records. Each record uses a logical record number as its key. This access method is designed for fast inserts at the tail of the queue, and it has a special operation that deletes and returns a record from the head of the queue.
This access method is unusual in that it provides record level locking. This can provide beneficial performance improvements in applications requiring concurrent access to the queue."
Justin,
I used a queue of (comb) guids as well as a btree keyed by the guids together. That allowed fast access and still let me have variable length data.
As for threading issues, there is a bug in BDB 4.5, which is the only version that has a .NET API. I wasn't going to write my own.
Excellent! I can't wait to put this thing to work.
So if you wanted to have several queues in a single process, I presume you would create several PersistentQueue objects (one per queue) each with a different path? Any way to have "named" queues that can share a path?
Yes, you would.
Several named queues:
new PersistentQueue("QueuesDir", "foo")
new PersistentQueue("QueuesDir", "bar")
Each would have their own sub directory in the main one
You should contact Alex from x-tensive.com.. They are also building a DB Storage without a DB, see
http://blog.x-tensive.com/2008/02/still-no-ctp.html
and other blogposts..
@Marco: isn't x-tensive doing that already for the past, what... 3 years? Every time they're re-re-re-re-re-rewriting what they have to capture even more things in the universe.
@Frans, you're watching your competitors very goed.. ;-)
You're a one-man API factory. Amazing.
Comment preview