So, here I am writing some really fun code, when I found out that I am running into dead locks in the code. I activate emergency protocols and went into deep debugging mode.
After being really through in figuring out several possible causes, I was still left with what is effectively a WTF @!(*!@ DAMN !(@*#!@* YOU !@*!@( outburst and a sudden longing for something to repeatedly hit.
Eventually, however, I figure out what was going on.
I have the following method: Aggregator.AggregateAsync(), inside which we have a call to the PulseAll method. That method will then go and execute the following code:
After that, I return from the method. In another piece of the code (Aggregator.Dispose) I am waiting for the task that is running the AggregateAsync method to complete.
Nothing worked! It took me a while before I figured out that I wanted to check the stack, where I found this:
Basically, I had a dead lock because when I called SetResult on the completion source (which freed the Dispose code to run), I actually switched over to that task and allowed it to run. Still in the same thread, but in a different task, I run through the rest of the code and eventually got to the Aggregator.Dispose(). Now, I could only get to it if it the PulseAll() method was called. But, because we are on the same thread, that task hasn’t been completed yet!
In the end, I “solved” that by introducing a DisposeAsync() method, which allowed us to yield the thread, and then the AggregateAsync task was completed, and then we could move on.
But I am really not very happy about this. Any ideas about proper way to handle async & IDisposable?
As I mentioned, I run into a very nasty issue with the TPL. I am not sure if it is me doing things wrong, or an actual issue.
Let us look at the code, shall we?
We start with a very simple code:
This is effectively an auto reset event. All the waiters will be released when the PulseAll it called. Then we have this runner, which just execute work:
And finally, the code that causes the problem:
So far, it is all pretty innocent, I think you would agree. But this cause hangs with a dead lock. Here is why:
Because tasks can share threads, we are in the Background task thread, and we are trying to wait on that background task completion.
If I add:
Because that forces this method to be completed in another thread, but that looks more like something that you add after you discover the bug, to be honest.
And… this test just passed!
Just to give you some idea, this is sitting on top of RavenDB’s implementation of leveldb. In fact, I have been using this code to test out the leveldb implementation.
But this actually store all the events, run the aggregation over them and give you the aggregated results. And the entire things works, quite nicely, even if I say so myself.
Starting with build 2603, we are now considering 2.5 to be a release candidate. It is feature frozen, and only bug fixes are going in.
You can download the new version here: http://hibernatingrhinos.com/builds/ravendb-unstable-v2.5, and it is also available on nuget as an unstable release.
There have been a lot of changes, most of which were recorded on this blog. Next week I’ll do a proper post about what is new, but in the meantime, you can check the release notes.
I urge you to download it and take it for a spin. We really need feedback from users about how it works, and any problems that you might run into.
Because, clearly, that is what is missing. RavenDB GetAll extension method
It appears that in my previous post I have had an issue with how I read the code. In particular, I looked at the commit log and didn’t look at the most recent changes with regards to how HyperLevelDB does the writes. Robert Escriva has been kind enough to point me in the right direction.
The way that this works is a lot more elegant, I think.
When you want to make a write to a file, you ask for a segment at a particular offset. If we have that offset already mapped, we give it to the caller. Otherwise, we increase the file size if needed, then map the next segment. That part is done under a lock, so there isn’t an issue of contention over the end of the file. That is much nicer than the pwrite method.
That said, however, I am still not sure about the issue with the two concurrent transactions. What actually happens here is that while we gained concurrency in the IO, there is still some serialization going on. In other words, even though transaction B was actually flushed to disk before transaction A, it would still wait for transaction A to complete, alleviating the concern that I have had about it.
As mentioned earlier, HyperDex has made some changes to LevelDB to make it work faster for their scenarios. I was curious to see what changed, so I took a look at the code. In my previous post, I dealt with compaction, but now I want to deal exclusively with the changes that were made to leveldb to make writes more concurrent.
Update: it appears that I made a mistake in reading the code (I didn't check the final version). See this post for the correction.
Another change that was made that I am really not sure that I am following is the notion of concurrent log writer. This relies on the pwrite() method, which allows you to write a buffer to a file at a specified position. I have not been able to figure out what is going on if you have concurrent writes to that file. The HyperDex modifications include synchronization on the offset where they will actually make the write, but after that, they make concurrent calls. It make sense, I guess, but I have several problems with that. I was unable to find any details about the behavior of the system when making concurrent calls to pwrite() at the end of the file, especially since your position might be well beyond the current end of file.
I couldn’t figure out what the behavior was supposed to be under those conditions, so I fired up a linux machine and wrote the following code:
I’ll be the first to admit that this is ugly code, but it gets the job done, and it told me that you could issues those sort of writes, and it would do the expected thing. I am going to assume that it would still work when used concurrently on the same file.
That said, there is still a problem, let us assume the following sequence of events:
- Write A
- Write B
- Write A is allocated range [5 – 10] in the log file
- Write B is allocated range [10 – 5] in the log file
- Write B is done and returns to the user
- Write A is NOT done, and we have a crash
Basically, we have here a violation of durability, because when we read from the log, we will get to the A write, see it is corrupted and stop processing the rest of the log. Effectively, you have just lost a committed transaction. Now, the log format actually allows that to happen, and a sophisticated reader can recover from that, but I haven’t seen yet any signs that that was implemented.
To be fair, I think that the log reader should be able to handle zero'ed data and continue forward. There is some sort of a comment about that. But that isn't supported by the brief glance that I saw, and more importantly, it isn't won't help if you crashed midway through writing A, so you have corrupted (not zero'ed) data on the fie. This would also cause the B transaction to be lost.
The rest appears to be just thread safety and then allowing concurrent writes to the log, which I have issue with, as I mentioned. But I am assuming that this will generate a high rate of writes, since there is a lot less waiting. That said, I am not sure how useful that would be. There is still just one physical needle that writes to the disk. I am guessing that it really depends on whatever or not you need the call to be synced or not. If you do, there are going to be a lot more fsync() than before, when it was merged into a single call.
The fun part is that I can easily check just the things that were changed by HyperDex, I don’t need to go read and understand a completely new engine. I’m currently just looking at the logs, and noting important stuff.
A lot of the work appears to have been concentrated on the notion of compaction. In particular, compaction means that we need to merge multiple files into a single file at a higher level. That is great, until you realize that this may mean that you’ll be writing a lot more data than you though you would, because you keep copying the data from one file to another, as it bubbles deeper into the levels.
The first thing that seemed to have happened is that when you get to a deep enough level, the file sizes that you are allowed to have become much larger, that means that you’ll have more data at higher level and likely reduce the number of times that you need to compact things.
The next step in the process was to create two parallel processes. One to handle the common compaction from memory to level-0 and the other to handle level-0 and down. I assume that this was done to avoid contention between the two. It is far more common to have a memory to level-0 compaction than level-0 down, and having to sync between the two is likely causing some slow down. The next change I noticed was tightening locks. Instead of having a single lock that was used for more writes, there now appears to be more granular locks, so you gain more concurrency under parallel load.
Next, the change was made to the compaction heuristics. I am not sure that I understand yet the changes, especially since I am just going through the commit log now. Once thing I noticed right away is that they removed the seek budget that you had for files. It appears that compactions triggered by reads were a source of too much pain for HyperDex, so it was removed. Note that HyperDex appears to be very interested in a consistent performance for high write scenarios.
Looking at the logs, it appears that there were a lot of strategies attempted for getting a better compaction strategy for what HyperDex is doing. Interestingly enough (and good for them) I see a lot of back & forth, trying something out, then backing out of it, etc. Sometimes over the courses of several weeks or months. A lot of that appears to be more like heuristically setting various things, like number of compacting threads, the various config options, etc. Trying to find the best match.
Another thing that I guess improved performance is that leveldb will intentionally slow down writes when is is doing compaction, or just about to, to reduce the load when heavy background operations are running. HyperLevelDB removed that, so you will have more load on the system, but no artificial governors on the system performance.
But coming back to compactions, it appears that what they ended up doing is to have three levels of background compactions.
- Memory Table –> Level0 files.
- Optimistic Compaction
- Background Compaction
To be honest, I think that at least between the last two there is a lot of code that can be shared, so it is a bit hard to read, but basically. We have an optimistic compaction thread running that waits for enough data to comes through to level 0, at which point it will try to do a low cost, high yield compaction. If that isn’t enough, we have the background compaction that will kick in, but that seems to be for there is really no other alternative.
Given the importance of compaction to performance, it would probably make more sense to split them into a separate concept, but I guess that they didn’t want to diverge too much from the leveldb codebase.
After having struggled with understanding Paxos for a while, I run into the Raft Draft paper, and I was very impressed. To start with, I have read at least four or five papers about Paxos, read 2 – 5 implementations of the algorithm in different languages, implemented it myself, and I am still not really comfortable about it.
On the other hand, after doing a single pass through the Raft paper, I have a much greater sense of understanding what it is about, how it works and even how I can implement that, if I want to. Hell, I fully expect to be able to hand that paper to a passerby CS student and get a working implementation without needing to get Sheldon Cooper involved. One thing to note, this paper and algorithm were heavily focused on making this understandable, and I think that they were quite successful in doing that. For that matter, I wish that other papers were this easy to read and follow.
Very interesting, and unlike the Paxos paper, immediately and almost painfully obvious how you would actually make use of something like that.
Kellabyte rightfully points out that leveldb was designed primarily for mobile devices, and that there have been many things that can be done that weren’t because of that design goal. I was already pointed out to HyperDex’s port of leveldb, and I am also looking into what Basho is doing with it.
You can read a bit about what HyperDex did to improve things here. And Basho challenges are detailed in the following presentation. The interesting thing about this is why people want to go to leveldb to start with.
To put it simply, it is a really good product, in the sense that it is doing what it needs to do, and it is small enough so you can understand it in a reasonably short order. The fun starts when you disagree with the decisions made for you by leveldb, it is actually quite easy to make changes to the root behavior of the project without making significant changes to the overall architecture. Looking over the HyperDex changes overview, it appears that they were able to make very targeted changes. All the hard problems (storing, searching, atomicity, etc) were already solved, now the problem is dealing with optimizing them.
I’m going to review the changes made by both Basho & HyperDex to leveldb, because I think that it would be a facinating object of study. In particular, I was impressed with how leveldb merged transactions, and I am looking forward at how HyperDex parallelized them.
I was pointed out to this article about friction in software, in particular, because I talk a lot about zero friction development. Yet the post show a totally different aspect of friction.
I would actually agree with that definition of friction. The problem is that there are many types of frictions. There is the friction that you have to deal with when you build a new project, in a new domain or using new technologies, so you need to figure out how to make those things work, and you spend a lot of your time just getting things working. Then there is the friction of the process you are using. If you need to have a work item associated with each commit (and only one commit), it means that you are going to either commit more slowly, or spend a lot more time just writing effectively useless work items. Then you have the friction of the common stuff. If your users require you fairly elaborate validation, even a simple date entry form can become an effort taking multiple days.
All of those are various types of frictions. And all of those adds up to the time & cost of building software. I tend to divide them in my head to friction & taxes. Friction is everything that gets in the way unnecessarily. For example, if I need to spend a day making a release, that is friction that I can easily automate. If I need to spend a lot of time on the structure of my software, that is friction. If I have checklists that I need to go through, that is usually friction.
Then there are the taxes, stuff that you do because you have to. In my case, this is usually error handling and production readiness, failure analysis and recovery, etc. In other cases this can be i18n, validation or the like. Those are things that you can streamline, but you can’t really reduce. If you need to run in a HA environment, you are going to be needing to write the code to do that, and test that. And that ain’t friction, even though it slows you down. It is just part of the cost of doing business.
And, of course, we have the toll trolls. That is what I call to those things that you don’t have to do, but are usually forced upon you. Sometimes it is for a valid reason, but a lot of the time it is Just Because. Example of tolls that are being laid upon software include such things as having to integrate with the CRM / ERP / TLD of the day. Being forced to use a specific infrastructure, even though it is widely inappropriate. And, possibly the best thing ever: “I went golfing with Andrew this weekend, and I have great news. We are a Java shop now!”
So, we have events sitting in a stream, and you can append to them as long as you like. And you can read from them, too. And you can also generate aggregations and look at those. In the previous post, I discussed the example of billing for a phone company as the example.
Let us consider this for a second. You’ll probably have aggregation per customer per month. So loading them would probably look something like:
And that is nice & all, but what about the more complex things? What happen if I want to search for high billing statement? Or all statements for the past year? What happen if I want to do a lookup by the customer name, not its id?
Well, I already know how to use Lucene, so we can plug this into the system and allow you to sear…
Wait a second! I already wrote all of that. As it turns out, a lot of the stuff that we want to do is already done. RavenDB has it implemented. For that matter, storing the aggregation results in RavenDB is going to introduce a lot of additional options. For example, we can do another map/reduce (inside RavenDB) to give you additional aggregation. We can do full text search, enrich the data and a lot more.
So that is what will probably happen. We will be writing the aggregated results from Raven Streams into RavenDB as standard documents, and you could then process them further using RavenDB’s standard tools.
Previously we introduced the following index (not sure if that would be a good name, maybe aggregation or projection?):
Let us consider how it works from the point of view of the system. The easy case is when we have just one event to work on. We are going to run it through the map and then through the reduce, and that would be about it.
What about the next step? Well, on the second event, all we actually need to do is run it through the map, then run it and the previous result through the reduce. The only thing we need to remember is the final result. No need to remember all the pesky details in the middle, because we don’t have the notion of updates / deletes to require them.
This make the entire process so simple it is ridiculous. I am actually looking forward to doing this, if only because I have to dig through a lot of complexity to get RavenDB’s map/reduce’s indexes to where they are now.
The major reason for streams is the idea that you don’t really care about each individual item on its own. What you care about a lot more is some sort of aggregation over those values. And sure, you do want to be able to access the values, but you generally don’t.
Let us say that you are a phone company, and you want to use Raven Streams to record all the events that happened, so you can bill on them. Let us imagine that we are interested in just SMS for the moment, so we append each sms to the stream.
Then we are going to write something like:
If you ever did RavenDB map/reduce indexes, this should be very familiar to you. However, unlike RavenDB, here we don’t need to handle any pesky updates or deletes. That means that the implementation is much simpler, but I’ll discuss that on my next post.
In the meantime, let us consider what is the result of this would be. It would generate a result, which we would persist and allow you to lookup. One can imagine that you can do this via the customer id, and get the sum total as it is right now.
But you’ll probably want to do additional operations, so we need to consider this as well.
For that matter, imagine the scenario where we want to get the data about SMS, MMS, phone calls, etc. How would you expect that to look like?
Raven Streams is a working title.
So far, I have been working mostly on getting things working from an infrastructure perspective. That is, I want to be able to successfully write & read from the streams, and I have a general architecture for it.
Basically, we have something like this:
This is a single stream (of which we expect to have more than one, but not a very high number). The stream makes all writes to a mem table, backed by a log. When the mem table is full, we switch to a new memtable and write all the data to a sorted string table (SST). This is very much like the design of leveldb.
Internally, however, we represent all data as etags. That is, you don’t have an actual key that you can use, the stream will generate an etag for you. And the SST are sorted in order, so the oldest events are in the oldest file, etc.
On startup, if we haven’t flushed properly, we will translate the log file into a new SST and start from scratch. I am not sure yet if we want / need to implement compaction. Maybe we will do that for very small files only.
Right now there are two methods that you actually care about:
This is how you use them:
This is all working pretty nicely right now. It isn’t nearly ready for production or even test usage, but this is encouraging. Next, I’ll discuss the intended implementation of map/reduce against this.
Continuing with my work on porting leveldb to .NET, we run into another problem. The log file. The log file is pretty important, this is how you ensure durability, so any problems there are a big cause of concern.
You can read a bit about the format used by leveldb here, but basically, it uses the following:
Block is of size 32Kb.
The type can be First, Middle, End or Full. Since it is legit to split a record across multiple blocks. The reasoning behind this format are outlined in the link above.
It is also a format that assumes that you know, upfront, the entire size of your record, so you can split it accordingly. That makes a lot of sense, when working in C++ and passing buffers around.
This is straightforward in C++, where the API is basically:
Status Writer::AddRecord(const Slice& slice)
(Slice is basically just a byte array).
In .NET, we do not want to be passing buffers around, mostly because of the impact on the LOH. So we had to be a bit smarter about things, in particular, we had an interesting issue with streaming the results. If I want to write a document with a size of 100K, how do I handle that?
Instead, I wanted this to look like this:
The problem with this approach is that we don’t know, upfront, what is the size that we are going to have. This means that we don’t know how to split the record, because we don’t have the record until it is over. And we don’t want (can’t actually) to go back in the log and change things to set the record straight (pun intended).
What we ended up doing is this:
Note that we explicitly mark the start / end of the record, and in the meantime, we can push however many bytes we want. Internally, we buffer up to 32Kb in size (a bit less, actually, but good enough for now) and based on the next call, we decide whatever the current block should be marked as good or bad.
The reason this is important is that this allows us to actually keep the same format as leveldb, with all of the benefits for dealing with corrupted data, if we need to. I also really like the idea of being able to have parallel readers on the log file, because we know that we can just skip at block boundaries.
Scripted Index Results (I wish it would have a better name) is a really interesting new feature in RavenDB 2.5. As the name implies, it allows you to attach scripts to indexes. Those scripts can operate on the results of the indexing.
Sounds boring, right? But the options that is opens are nothing but. Using Scripted Index Results you can get recursive map/reduce indexes, for example. But we won’t be doing that today. Instead, I’ll show how you can enhance entities with additional information from other sources.
Our sample database is Northwind, and we have defined the following index to get some statistics about our customers:
And we can query it like this:
However, what we want to do is to be able to embed those values inside the company document, so we won’t have to query for it separately. Here is how we can use the new Scripted Index Results bundle to do this:
Once we have defined that, whenever the index is done, it will run these scripts, and that, in turns, means that this is what our dear ALFKI looks like:
I’ll leave recursive map/reduce as tidbit for my dear readers .
One of the nice things about having more realistic demo data set is that we can now actually show demos on that data. I didn’t realize how much of an issue that was until we actually improved things.
Let us see how we are going to demo RavenDB’s dynamic reporting feature.
We start by creating the following index. It is a pretty simple one, with the one thing to notice is that we are explicitly setting the Sort mode for Total to be Double.
Now that we have done that, we are going to go to Query > Reporting:
And then I can start issue reporting queries:
This is the equivalent of doing:
select EmployeeID, sum(tot.Total) Total from Orders o join ( select sum((Quantity * UnitPrice) * (1- Discount)) Total, OrderId from [Order Details] group by OrderID ) tot on o.OrderID = tot.OrderID where o.CustomerID = @CustomerId group by EmployeeID
The nice thing about this, and what makes this feature different from standard map/reduce, is that you can filter the input data into the aggregation.
In code, this would look something like this:
session.Query<Order>("Orders/Total") .Where(x=>x.Company = companyId) .AggregateBy(x=>x.Employee) .SumOn(x=>x.Total) .ToList();
Pretty nice, even if I say so myself.
RavenDB has always came with some sample data that you could use to test things out. Unfortunately, that sample data was pretty basic, and didn’t really cover a lot of interesting scenarios.
For RavenDB 2.5, we updated the sample data to use the brand new and amazing (wait for it…) Northwind database.
At a minimum, it would make demoing stuff easier. And in order to make things even nicer, you can get the C# classes for the sample data here.
The following code has very subtle bug:
I’ll even give you a hint, try to run the following client code:
Well, it is nearly the 29 May, and that means that I have been married for two years.
All you have to do is purchase any of our products using the following coupon code:
This offer is valid to the end of the month, so hurry up.
One of the things that we are planning for Raven 3.0 is the introducing of additional options. In addition to having RavenDB, we will also have RavenFS, which is a replicated file system with an eye toward very large files. But that isn’t what I want to talk about today. Today I would like to talk about something that is currently just in my head. I don’t even have a proper name for it yet.
Here is the deal, RavenDB is very good for data that you care about individually. Orders, customers, etc. You track, modify and work with each document independently. If you are writing a lot of data that isn’t really relevant on its own, but only as an aggregate, that is probably not a good use case for RavenDB.
Examples for such things include logs, click streams, event tracking, etc. The trivial example would be any reality show, where you have a lot of users sending messages to vote for a particular candidate, and you don’t really care for the individual data points, only the aggregate. Other things might be to want to track how many items were sold in a particular period based on region, etc.
The API that I had in mind would be something like:
And then you can write map/reduce statements on them like this:
Yes, this looks pretty much like you would have in RavenDB, but there are important distinctions:
- We don’t allow modifying writes, nor deleting them.
- Most of the operations are assumed to be made on the result of the map/reduce statements.
- The assumption is that you don’t really care for each data point.
- There is going to be a lot of those data points, and they are likely to be coming in at a relatively high rate.