Oren Eini

CEO of RavenDB

a NoSQL Open Source Document Database

Get in touch with me:

oren@ravendb.net +972 52-548-6969

Posts: 7,546
|
Comments: 51,161
Privacy Policy · Terms
filter by tags archive
time to read 5 min | 926 words

We didn’t plan to do a lot of changes on the client side for RavenDB 4.0. We want to do most changes on the server side, and just adapt the client when we are doing something differently.

However, we run into an interesting problem. A while ago we asked Idan, a new guy at the company, to write a Python client for RavenDB as part of the onboarding process. Unlike with the JVM client, we didn’t want to do an exact duplication, with just the changes to match the new platform. We basically told him, go ahead and do this. And he went, and he did. But along the way he got to choose his own approach for the implementation, and he didn’t copy the same internal architecture. The end result is that the Python client is significantly simpler than the C# one.

That has a few reasons. To start with, the Python client doesn’t need to implement features such as async or Linq. But for the most part, it is because Idan was able to look at the entire system, grasp it, and then just implement the whole thing in one go.

The RavenDB network layer on the client side has gone through many changes over the years. In particular, we have the following concerns handled in this layer:

  • Actual sending of requests to the server.
  • High availability, Failover, load balancing and SLA matching.
  • Authentication of the requests
  • Caching server responses.

I’m probably ignoring a few that snuck in there, but those should be the main ones. The primary responsibility of the network layer in RavenDB is to send requests to the server (with all the concerns mentioned above) and give us the JSON object back.

The problem is that right now we have those responsibilities scattered all over the place. Each was added at a different time, and using several different ways, and the handling is done in very different locations in the code.  This leads to complexity, and seeing everything in one place in the Python client is a great motivation to simplify things. So we are going to do that.

image

Having a single location where all of those concerned will make things simpler for us. But we can do better. Instead of just returning a JSON object, we can solve a few additional issues. Performance, stability and ease of deployment.

We can do that by removing the the internal dependency on JSON.Net. A while ago we got tired from conflicting JSON.Net versions, and we decided to just internalize it in our code. That led to much simpler deployment life, but does add some level of complexity if you want to customize your entities over the wire and in RavenDB (because you have to use our own copy of JSON.Net for that). And it complicates getting new JSON.Net updates, which we want.

So we are going to do something quite interesting here. We are going to drop the dependency on JSON.Net entirely. We already have a JSON parser, and one that is extremely efficient, using the blittable format. More importantly, it can writes directly to native code. That give us a very important property, it make it very easy to store the data, already parsed, with a well known size. Let me try to explain my reasoning.

A very important feature of RavenDB is the idea that it can cache requests for you. When you make a request to the server, the server returns the reply as well as an etag. Next time you make this request, you’ll use the same etag, and if the response hasn’t changed, the server can just tell you to use the value in the cache. Right now we are storing the full string of the request in the cache. That lead to a few issues, in particular, while we saved on the I/O of sending the request, we still need to parse the JSON, and we need to keep relatively large (sometimes very large) .NET strings around for a long time.

But if we use blittable data in the client cache, then we have no need to do additional parsing, and we don’t need to involve the GC on the client side to collect things, we can manage that explicitly and directly. So the 2nd time you make a request, we’ll hit the server, learn that the value is still relevant, and then just use it.

That is great for us. Except for one thing. JSON.Net is doing really great job in translating between JSON and .NET objects, and that isn’t really something that we want to do. Instead, we are going to handle blittable objects throughout the RavenDB codebase. We don’t actually need to deal with the user .NET’s types until the very top layers of the system. And we can handle that by having a convention, that will call into JSON.Net (the only place that will happen) and translate .NET objects to JSON and back. The nice thing about it is that since this is just a single location where this happens, we can do that dynamically, without having a hard dependency on JSON.Net version.

That, in turn, also expose the ability to do JSON <—> .Net objects using other parsers, such as Jil, for example, or whatever the end user decides.

time to read 12 min | 2270 words

Replication with RavenDB is one of our core features. Something that we had in the product from the very first release (although we did simplify things by several orders of magnitudes over the years). Replication is responsible for high availability, load balancing and several other goodies. For the most part, replication works quite well, and it is a lot less complex then some of the other things that grew over the years (LoadDocument, for example). That said, it doesn’t mean that it can’t be improved. And since this is such an important aspect of RavenDB, we spent quite a lot of time in seeing what we can do to improve it.

Here are the basic design guidelines:

  • RavenDB is going to remain a multi master system, where each node can accept writes and distribute it to its siblings.
    • We intend to use Raft for dynamic leader selection, but that is a layer on top of the basic replication.
    • That means that RavenDB is an AP system, and needs to handle conflicts.
  • We mostly deal with fully connected graphs of relatively small clusters (less than 10 nodes).
    • Higher number of nodes are quite frequent, but they don’t use a mesh topology, but typically go for a hierarchy.

This post is going to focus solely on the server side aspects of replication, I’ll do another post about changes from the clients perspective.

Probably the first thing that we intend to change is how we track the replication history. Currently, we track the last 50 changes made on a document. This has several problems:

  • if there have been more than 50 changes on a document between replication batches, we’ll get a false conflict.
  • if the documents are small, in many cases the replication metadata is actually bigger than the document itself.

We are going to move to an explicit vector clock implementation. This is a bit complex, because there are multiple concepts that we need to track concurrently here.

Every time that a document changes, the server generate an etag for that change. This etag is an int64 number that is always increasing. This is used for optimistic concurrently, indexing, etc. The etag value is per server, and cannot be used across servers. Each server has a unique identifier. Joining the two together, whenever a document is changed on a server directly (not via replication), we’ll stamp it with the server id and the current document etag.

In other words, let us imagine the following set of operations happening in a three node cluster.

Users/1 is created on Server A, it gets an etag of 1 and a vector clock of {A:1}. Users/2 is created on Server A, it gets an etag of 2 and a vector clock of {A:2}. Users/3 is created on Server C, it gets etag 1 (because etags are local per server) and its vector clock is {C:1}. Servers A and C both replicate to Server B, and to each other, resulting in the following cluster wide setup:

  Server A Server B Server C
Users/1 etag 1, {A:1} etag 1, {A:1} etag 2, {A:1}
Users/2 etag 2, {A:2} etag 3, {A:2} etag 3, {A:2}
Users/3 etag 3, {C:1} etag 2, {C:1} etag 1, {C:1}

Note that the etags assigned for each document are not consistent across the different servers, but that they are temporally consistent with respect to the writes. In other words, Users/1 will always have a lower etag than Users/2.

Now, when we modify Users/3 on server B, we’ll get the following cluster wide picture:

  Server A Server B Server C
Users/1 etag 1, {A:1} etag 1, {A:1} etag 2, {A:1}
Users/2 etag 2, {A:2} etag 3, {A:2} etag 3, {A:2}
Users/3 etag 4, {B:4,C:1} etag 4, {B:4,C:1} etag 4, {B:4,C:1}

As I said, only changed on the server directly (and not via replication) will impact the document vector clock, but any modification (replication or directly on the node) will modify a document’s etag.

Using such vectors clocks, we gain two major features. First, it is very easy to see if we have conflicting changes. {C:1, B:4} is obviously a parent of {C:4,B:6}, while {C:2,A:6} is a conflict. The other is that we can now form a very easy view of the kind of changes that we have received. We do that using a server wide vector clock. In the case of the table above, the server wide vector clock would be {A:2,B4,C:1}. In other words, it will contain the latest etag seen from each server.

We’ll get to why exactly this is important for us in a bit. For now, just accept that it does, because the next part is about how we are going to actually do the replication. In previous versions of RavenDB, we did each replication batch through a separate REST call to the remote server. This has a few disadvantages. It meant that we had to authenticate every single time, and we couldn’t make any assumptions about the state of the remote server.

In RavenDB 4.0, we intend to move replication to use pure Websockets only usage. On startup, a node will connect to all its siblings, and stay connected to them (retrying the connection on any interruption). This has the nice benefit of only doing authentication once, of course, but far more interesting from our perspective is the fact that it means that we can rely on the state of the server on the other side. TCP has a few very interesting properties here for us.  In particular, it guarantee that we’ll have ordered delivery of messages. Which means that we can assume that once we sent a message to a server on a TCP connection, it either got it, or the TCP connection will return an error at some point, forcing us to reconnect.

In other words, it isn’t just authentication that I can do just once, I can also query the remote server for its state (as it regards me), and since I’m the only person that can talk as myself, and I’m the one sending the details. As long as the connection lasts, I know what the other side knows about me. Confusing, isn’t it?

But basically it means that instead of having to ask on each batch what is the last document that the destination server saw of me, I can assume that the last document that I sent was received. That lasts until the connection breaks, in which case I can need to figure out what actually arrived. This seems like a small thing, but this will actually allow me to reduce the number of roundtrips for a batch by half. There are other aspects here that are really nice, I get to piggyback on TCP’s congestion protocol, so if the remote server is slow in accepting updates, it will (eventually) reflect as a blocking write on my end. That seems like a bad thing, right? But this is actually what I want.

Each destination server in RavenDB 4.0 is going to get its own dedicated thread. This thread will manage all outgoing communication with this server. That gives us several really important behaviors. It means that we can easily account for problems by just looking at the thread responsible (hm… I see that replication to node C is consuming a lot of CPU) and it also turn the entire replication process to a pretty simple single threaded operation. Because of the blittable format, we don’t need complex prefetching strategies or sharing of memory in the replication, and a slow node will not impact any other replication behavior. That, in turn, basically mean a thread per connection (see previous discussion on the expected number of nodes being relatively small) and a very simple programming / error handling / communication model.

The replication sending logic goes something like this:

Yes, my scratch pad language is still Boo (Python, if you aren’t familiar with it), and this is meant to convey how simple that thing is. All the complexity that we currently have to deal with is out. Of course, the real code will need to have error handling, reconnection logic, etc, but that is roughly all you’ll need.

Actually, that is a lie. The problem with the code above is that it doesn’t work well with multiple servers. In other words, it is perfect for two nodes, replicating to one another, but when you have multiple nodes, you don’t want a single document update to replication from each node to every other node. That is why we have the concept of vector clocks. At the document level, this serves as an easy way to detect conflicts and see what version of a document is casually later than another version of a document. But on the server level, we gather the latest writes from all nodes that we saw to get the server wide vector clock.

When a document is modified on a server, that server will immediately send that document to all its siblings. Because there is no way that they already have it. But if a document was replicated to a node, it will not start replicating right away. Instead, it will let a set amount of time go by (defaulting to once a minute) and then ask each sibling what is the latest server wide vector clock that it is aware of. If the remote vector clock is equal to or higher than the local server wide vector clock, then we know that they are up to date. In this case, the local server will let the remote server know that they are a match to the current etag on that server.

If, however, the local vector clock is smaller (or conflicting) from the remote server, then we need to send the relevant documents. We already know what is the last etag that the remote server has from us (we negotiated that when we established the connection, and we updated it every time we sent a document to the remote server. Since we have the current vector clock from the remote server, we aren’t going to just blindly send all documents after the last etag we sent to the remote server. Instead, we are going to check each of those to see if the vector clock for the document is larger (or conflicting) than the remote server vector clock. In this way, we can send the remote server only the documents that it doesn’t have.

What about delayed servers? If we had a new node in the cluster, and we just started replicating to it, what happens when a new document is being written. Above, I mentioned that the written to server will immediately write it to all its siblings, but that is an over simplification. An extremely important property of RavenDB replication is that documents are always replicated in the order the server saw them (either written to it directly, or the order they were replicated to it). If we allow a server to replicate documents directly to another server, that might break this guarantee. Looking at the code above, it will also require us to write a separate code path to handle such things. But that is the beauty in this design. All of this logic is actually encapsulated in WaitForMoreDocuments(). You can this of WaitForMoreDocuments() as a simple manual reset event. Whenever a document is written to a document directly, it will be set. But not when a document is replicated to us.

So WaitForMoreDocuments() will basically wait for a document to be written to us, or a timeout, in which case it will check with its sibling for new stuff that need to go over the wire because it was replicated to us. But the code is the same code, and the behavior is the same. If we are busy sending data to a new server? We’ll still set the event, but that will have no effect on the actual behavior. And when we are working with a fully caught up server, the act of writing a single document will immediately free the replication threads to start sending it to the sibling. All the desired behaviors, and very little actual complexity.

On the receiving end, we get just the documents we don’t have, as well as the last etag from that source server (which we’ll keep in persistent storage). Whenever we get a new document, we’ll check if it is conflicting. If so, we’ll mark the document as conflicting and allow the user to define default strategies to handle that (latest, resolve to remote, resolve to local). But we are also going to allow the user to define a Javascript function that will merge the conflicted documents directly. This way you can have your business logic for the resolution directly on the server, and you’ll never actually see any conflicts externally.

There are quite a lot of small details that I’m skipping, but this is already long enough, and should give you a pretty good idea about where we are headed.

time to read 4 min | 748 words

We have been trying to get RavenDB to run on Linux for the over 4 years. A large portion of our motivation to build Voron was that it will also allow us to run on Linux natively, and free us from dependencies on Windows OS versions.

The attempt was actually made several times, and Voron has been running successfully on Linux for the past 2 years, but Mono was never really good enough for our needs. My hypothesis is that if we were working with it from day one, it would have been sort of possible to do it. But trying to port a non trivial (and quite a bit more complex and demanding than your run of the mill  business app) to Mono after the fact was just a no go. There was too much that we did in ways that Mono just couldn’t handle. From GC corruption to just plain “no one ever called this method ever” bugs. We hired a full time developer to handle porting to Linux, and after about six months of effort, all we had to show for that was !@&#! and stuff that would randomly crash in the Mono VM.

The CoreCLR changed things dramatically. It still takes a lot of work, but now it isn’t about fighting tooth and nail to get anything working. My issues with the CoreCLR are primarily in the area of “I wanna have more of the goodies”. We had our share of issues porting, some of them were obvious, a very different I/O subsystem and behaviors. Other were just weird (you can’t convince me that the Out Of Memory Killer is the way things are supposed to be or the fsync dance for creating files), but a lot of that was obvious (case sensitive paths, / vs \, etc). But pretty much all of this was what it was supposed to be. We would have seen the same stuff if were working in C.

So right now, we have RavenDB 4.0 running on:

  • Windows x64 arch
  • Linux x64 arch

We are working on getting it running on Windows and Linux in 32 bits modes as well, and we hope to be able to run it on ARM (a lot of that depend on the porting speed of the CoreCLR to ARM, which seems to be moving quite nicely).

While there is still a lot to be done, let me take you into a tour of what we already have.

First, the setup instructions:

image

This should take care of all the dependencies (including installing CoreCLR if needed), and run all the tests.

You can now run the dnx command (or the dotnet cli, as soon as that become stable enough for us to use), which will give you RavenDB on Linux:

image

By and large, RavenDB on Windows and Linux behaves pretty much in the same manner. But there are some differences.

I mentioned that the out of memory killer nonsense behavior, right? Instead of relying on swap files and the inherent unreliability of Linux in memory allocations, we create temporary files and map them as our scratch buffers, to avoid the OS suddenly deciding that we are nasty memory hogs and that it needs some bacon. Windows has features that allow the OS to tell applications that it is about to run out of memory, and we can respond to that. In Linux, the OS goes into a killing spree, so we need to monitor that actively and takes steps accordingly.

Even so, administrators are expected to set vm.overcommit_memory and vm.oom-kill to proper values (2 and 0, respectively, are the values we are currently recommending, but that might change).

Websockets client handling is also currently not available on the CoreCLR for Linux. We have our own trivial implementation based on TcpClient, which currently supports on HTTP. We’ll replace that with the real implementation as soon as the functionality becomes available on Linux.

Right now we are seeing identical behaviors on Linux and Windows, with similar performance profiles, and are quite excited by this.

time to read 7 min | 1351 words

LoadDocument in RavenDB is a really nice feature. It allows you to reach out to another document during indexing, and load its value. A simple example of that would be:

from p in docs.Pets
select new { Name = p.Name, OwnerName = LoadDocument(p.OwnerId).Name }

When we got the idea for LoadDocument, we were very excited, because it allowed us to solve some very tough problems.

Unfortunately, this also has some really nasty implementation issues. In particular, one of the promises we give for LoadDocument is that we’ll re-index the referencing document if the referenced document changed. In other words, if my wife changed her name, even though my document wasn’t changed, it will be re-indexed, my wife’s document will be loaded and the new name will end up in the index.

Now, consider what happens when there are two concurrent transactions. The first transaction happens during indexing, we try to load the owner document, which doesn’t exists, so we leave a record in place so force re-indexing when it is inserted, but at the same time, a new transaction is opened and the owner document is inserted. During the insert, it checks if there are any referencing documents, but since both transactions aren’t committed yet, they can’t see each other changes. And we end up with a bug. Resolving that took a bit of coordination between the two processes, which was hard to get right.

Another issue that we have is the fact that each LoadDocument call need to create a record of its existence, so we’ll know what documents require re-indexing. However, for large indexes that use LoadDocument, the number of entries there can be staggering, and impact the amount of time we have to delete an index, for example. It also force up to do a bit of work during document updates that is proportional to the number of documents referencing a particular document. In some cases, all documents in the database reference a single document, and an update to that document can take a very long time. In fact, we limit the amount of time that this can take to 30 seconds, and abort the update if it takes this long. This is one of the only cases where insert speed is impacted in RavenDB (we have a workaround to update the document without triggering re-indexing, of course).

So, overall, we have a really nice feature, but it has some serious drawbacks when you peel back the implementation details. In RavenDB 4.0, we have decided to try as much as possible to avoid having things like that, so we sat down and tried to think how we can get something like that working.

We have the following considerations:

  • All data must be scoped to the index level. Nothing that require multiple indexes to cooperate.
  • We cannot have any  global data, or have interactions between documents an indexing that require complex coordination.
  • We cannot use TouchDocument as a control mechanism any longer.
  • It should be as simple as we can get away with it.

The solution we came up with goes like this (a full walkthrough can be found after the explanation):

  • An index that uses LoadDocument cannot just look at the items in the collections it covers, it need to go over all documents.
    • We can probably get away with only scanning the documents from collections that we loaded documents from, but what if the document doesn’t exists yet? In that case, we need to scan all documents anyway.
    • We’ll have an overload of LoadDocument that specify the collection type (which we can auto fill from the client side based on the provided type) to optimize this.
  • A call to LoadDocument is going to record the relationship between the two documents, in the index’s own storage (Unlike before, we have no global tracking). Conceptually, you can think about that storage as the “references table”, with the source document key and the destination document key. In practice, we’ll use an optimal data structure for this, but it is easier if you imagine a table with those two columns.
  • Instead of using TouchDocument to modify documents etag (which requires us to mix indexing and documents operations), the index will keep track of two sets of etags. The first is the index’s own collection of documents it is indexing, and it is known as the “last indexed etag”. The second is the last etag of the documents that are being referenced via LoadDocument by this index, and is known as the “last referenced etag”.
  • When a document from a collection that is being referenced is updated, we’ll wake the index and check all the documents in that collections after the last referenced etag we have. For each of those, we’ll see if they have any references in the “references table”. If they don’t, there is nothing to do. If there is, we’ll reindex those documents immediately (see below for some optimization opportunities there).
  • The index will then update the last referenced etag it scanned.
  • Such an index will be considered non stale if both the last indexed etag and the last referenced etag are equal to the last document etag in the database.

Basically, we move the entire responsibility of updating the index from the database as a whole to just the index.

It also makes the index in question alone pay for those costs. And given that we have a separate “re-indexing” round, we can track the additional costs of such measure directly.

It is a lot to take in, so let me try to explain in detail. We start with the new index definition.

from p in docs.Pets
select new { Name = p.Name, OwnerName = LoadDocument(p.OwnerId, “People”).Name }

The first change is that the LoadDocument call is going to specify the collection of the loaded document (or no collection, if the document can come from any collection).

The index is going to keep track of the following details:

  • LastIndexedEtag – for the collection that this covers, in this case, the “Pets” collection.
  • LastReferencedEtag – for the collection(s) specified in the LoadDocument, in this case, the People collection.

We now have the following state in the database:

  • LastIndexedEtag is 10 for the Pets collection.
  • LastReferencedEtag is 0 for the People collection.
  • People/1’s etag is set to 12.
  • Pets/1’s etag is set to 7.
  • Pets/2’s etag is set to 11.

Now, when indexing, we are going to do the following steps:

  • For each of the collections we have setup tracking for, get all documents following the LastReferencedEtag
      • In this case, scan the People collection for all etags following 0.
    • For each of the resulting documents, check whatever there is are documents referencing that document.
      • In this case, people/1 is returned, and it is being referenced by pets/1 and pets/2.
      • Because the etag of pets/1 (7) is lower than the LastIndexedEtag (10), we need to index that right away.
      • The etag of pets/2 (11) is higher than the LastIndexedEtag (10), so we don’t index it.
    • After we are done scanning through the People collection, we update our LastReferencedEtag to the last item in the people collection (which would be 12).
  • We then continue to index the Pets collection normally.
    • We get pets/2, whose etag is 12 and index that, loading People/1 again. (This is why we could skip it previously).
    • Finally, we update our LastIndexedEtag to 12 (the last Pets document we indexed).

On the next batch of indexing, we’ll again scan the People collection for documents that have changed, and then the pets that changed, and so on.

Now, a document that is being referenced by many other documents will not require any additional work on our side. We’ll just re-index the documents referencing it, which is much better than the current state.

Note that this design ignores a few details, but this should paint the general outline.

time to read 6 min | 1009 words

Map/Reduce is a core part of RavenDB, one of the earliest features that we implemented and something that we have worked to improve many times. You can read my original blog post about them. In the current codebase, Map/Reduce is also one of the scariest pieces of code, and one of the most fragile. The good thing is that we have a really high number of tests around it. The sad thing is that it takes a long time to make any modification there to stick without breaking something else.

It is a complex topic from the get go, and having a performant version of that was not trivial, so put all together, this is one of the more complex pieces of code in RavenDB. I’m not going to go into the details on how this works now, it is that complex.

That complexity really bugged me. This is an area of the code that only a few of us had approached, and usually with various levels of dread. We spent so much time simplifying things in RavenDB 4.0 that I couldn’t really abide the concept of having to bring this complexity over and having to live with it. But we couldn’t find a better way to do it.

Until I realized that I was thinking about this in totally the wrong level. The way to handle that was to reduce the level of abstraction down, way down. Again, being able to control the storage at a very low level helps a lot.

I have talked before about B+Trees, and this time, we are going to use their structure directly. Here is how it works. Let us assume that we have the following map/reduce index:

This operates on orders, and gives us the total purchases per product. Now, for the actual implementation. We first run the map function over a set of orders, giving us the mapped results, which we’ll store in a B+Tree.  A B+Tree is a key/value structure, and previously we used composite keys using the reduced key, the document key, and a whole bunch of other stuff. That led to a whole lot of complications.

Now, we are going to have a separate key per reduce key. Confusing? Lets see if I can explain. Each reduce key (a reduce key is the thing that you group by, in this case, that is the product id) is going to have its own dedicated tree. And the content of that tree is going to be the document id as the key to the tree, and the mapped result for that particular reduce key as the value. So far, that isn’t really impressive. We changed things around, instead of having a single tree for all mapped results, we are going to have a tree per reduce key.

But here is where it gets interesting. B+Tree are… well, trees. That means that they are hierarchical in nature. What we did was ask the tree to let us know about all the changed pages. Consider the image below, which shows the status of the database after indexing a few orders, which have line items for products/17 and products/10.

image

We are now going to update orders/2. So the next thing that we need to do, is to run the map over the updated order, giving us new entries for products/17 and products/10. Each of them in a different tree. Because we can ask the tree what pages have changed, we can now do the following:

  • For each changed page:
    • calculate the new value of the page.
    • mark the parent page as changed
  • Repeat until there are no changed pages.

Let us see this in detail, in Page 14, which is the (small) tree for products/10, we don’t really have much to do. We just need to run the reduce over all the entries in the page, and we have the final result. Things are more interesting with products/17.

Here, we updated Page 31 during the map process. When we get to the reduce, we discover that, and that means that we need to re-reduce Page 31. But that isn’t the end. Now we need to also update things upward. In our case, we need to update Page 13. You might have noticed the list on the side, there is the computed reduce value per each page there. And when we are done updating Page 31, we go to its parent, Page 13. We get the computed values for pages 31,44, 32 and reduce those, giving us the final value for Page 13. In which point, we go up yet again, and reduce Page 13 and Page 33 together, resulting in the final value.

All in all, this is pretty simple to explain, was very easy to implement and is going to handle the complexities of map/reduce in dramatically better fashion.

In the example above, I’m showing it working with very few entries per page. In practice, we are usually talking about roughly 200 entries per page, so a reduce key has more than 200 entries, we start going with the multiple steps.

Our go-to example for map reduce was the US census. 310 million people, more or less, and we want to build a count of how many people per state. That gives us California, with roughly 40 million people in it. Adding a new person to California using this system will result in a B+Tree that have a depth of 5, and re-reducing after an update will take less than a thousand operations to re-compute it. And that is for an update / delete operation.

If we have a new value, we can skip the whole thing, and re-update the map/reduce in about 6 operations. And the entire codebase is easy to read, and we expect it to be much faster as well.

time to read 4 min | 629 words

In my last post on the topic, I discussed physically separating documents of different collections. This post is about the same concept, but applied at a much higher level. In RavenDB, along with the actual indexing data, we also need to keep track of quite a few details. What did we last index, what is our current state, any errors that happened, keep track of referenced documents, etc. For map/reduce indexes, we have quite a bit more data that we need to work with, all the intermediate results of the map/reduce process, along with bookkeeping information about how to efficiently reduce additional values, etc.

All of that information is stored in the same set of files as the documents themselves. As far as the user is concerned, this is mostly relevant when we need to delete an index. Because on large databases the deletion of a big index can take a while, this was an operational issue. In RavenDB 3.0 we changed things so index deletion would be async, which improved matters significantly. But on large databases with many indexes, that still got us into problems.

Because all the indexes were using the same underlying storage, that meant that the number of values that we had to track was high. And it was proportional to the number of indexes and the amount of documents they indexed. That means that in a particular database with a hundred million documents, and three map/reduce indexes, we had to keep track of over half a billion entries. B+Trees are really amazing creatures, but one of their downsides is that once they get to a certain size, they slow down as the cost of traversing the tree become very high.

In relational terms, we put all the indexing data into a single table, and had a IndexId column to distinguish between the different records. And once the table got big enough, we had issues.

One of the design decisions we made in the build up to RavenDB 4.0 was to remove multi threaded behavior inside Voron, so that led to an interesting problem with having everything in the same Voron storage. We wouldn’t be able to index and accept new documents at the same time (I’ll have another post about this design decision).

The single threaded nature and the problems with index deletion has led us toward an interesting decision. A RavenDB database isn’t actually composed from a single Voron storage. It is composed of multiple of those, each of them operating independently of one another.

The first one, obviously, is for the documents. But each of the indexes now have its own Voron storage. That means that they are totally independent from one another, which leads to a few interesting implications:

  • Deleting an index is as simple as shutting down the indexing for this index and then deleting the Voron directory from the file system.
  • Each index has its own independent data structures, so having multiple big indexes isn’t going to cause us to pay the price of all of them together.
  • Because each index has a dedicated thread, we aren’t going to see any complex coordination between multiple actors needing to use the same Voron storage.

This is important, because in RavenDB 4.0, we are also storing the actual Lucene index inside the Voron storage, so the amount of work that we now require it to deal with is much higher. By splitting it along each index line, we have saved ourselves a whole bunch of headache on how to manage them properly.

As a reminder, we have the RavenDB Conference in Texas shortly, which would be an excellent opportunity to discuss RavenDB 4.0 and see what we already have done.

image

time to read 3 min | 471 words

Kill it With Fire Aliens

When we started to build Voron, we based some of its behavior around how LevelDB works. While the storage details are heavily influenced by LMDB, we did take a few things from LevelDB. In particular, the idea of transaction merging.

You can read about our implementation in our design notes for Voron from 2013. The idea is that even though you have a single writer, you can prepare the transaction separately, then submit the transaction to be processed by a dedicated thread. This thread will merge all pending transaction requests into a single physical transaction and allow us to parallelize some of the work, amortizing the cost of going to disk across multiple concurrent transactions.

This is how Voron is running now, and it was a feature that I, personally, was very excited about.  And in RavenDB 4.0, we killed this feature.

If this is such a great and exciting feature, why kill it and go to a single writer only mode?

There are actually several distinct reasons, each of them serving as a big black mark against this feature.

The first strike against this feature is that is result in much higher memory usage and copying of data. Whenever we need to create a transaction, we have to write all the data into a temporary buffer, which is then sent to the transaction merger. This result in memory hanging around longer, higher allocations, and double copying of the data.

The second strike against this feature is that it result in unpredictable behavior. Because transactions are merged on a first come/first served basis, small differences in the execution of transactions can dramatically change the order of operations that is actually committed. Usually it doesn’t matter, but if we need to track down on a particular issue, that is a really important. Having a single writer means that we have very predictable behavior.

The third strike against this feature is that it leads to concurrency aware code. Because you are going to submit a transaction to be processed, there is potentially other transactions that can change the data that you rely on. We have ways to handle that, but requesting optimistic concurrency checks to be done, but this end up being quite complex to manage properly.

The forth strike against this feature is that the major reason it was needed was that we wanted to be able to parallelize the work of indexing and documents, and that was meant to handle just that. But the re-shaping of the indexes storage and documents storage means that we have separate Voron storages for the documents and for each index, so we still have this ability, but were able to remove this code and reduce our complexity significantly.

time to read 5 min | 915 words

When we started writing RavenDB, the idea of collection was this notion of “just a way to say that those documents are roughly similar”. We were deep in the schemaless nature of the system, and it made very little sense at the time to split different documents. By having all documents in the same location (and by that, I mean that they were all effectively stored in the same physical format and the only way to tell the difference between a User document and an Order document is by reading their metadata), we were able to do some really cool things. Indexes could operate over multiple collections easily, replication was simple, exporting documents was very natural operation, etc.

Over time, we learned by experience that most of the time, documents in separate collections are truly separate. They are processed differently, behave differently, and users expect to be able to operate on them differently. This is mostly visible when users have a large database and try to define an index on a small collection, and are surprised when it can take a while to index. The fact that we need to go over all the documents (because we can’t tell them apart before we read them) is not something that is in the mental model for most users.

We have work around most of that by utilizing our own indexing structure. The Raven/DocumentsByEntityName index is used to do quite a lot. For example, we often are able to optimize the “small collection, new index” scenario using the Raven/DocumentsByEntityName, deletion / patching of collections through the studio is using it, etc.

In RavenDB 4.0 we decided to see what it would take to properly segregate collections. As it turned out, this is actually quite hard to do, because of the etag property.

The etag property of RavenDB goes basically like this: Etags are numbers (128 bits in RavenDB up to 3.x, 64 bits in RavenDB 4.0 and onward) that are always increasing, and each document change will result in a higher etag being generated. You can ask RavenDB to give you all documents since a particular etag, and by continually doing this, you’ll get all documents in the database, including all updates.

This properly is the key for quite a lot of stuff internally. Replication, indexing, subscriptions, exports, the works.

But by putting documents in separate physical locations, that means that we won’t have an easy way to scan through all of them. In RavenDB 3.0, we effectively have an index of [Etag, Document], and the process of getting all documents after a particular etag is extremely simple and cheap. But if we segregate collections, we’ll need to merge the information from multiple locations, which can be non trivial, and has a complexity of O(N logN).

There is also the issue of the document keys namespace, which is global (so you can’t have a User with the document key “users/1” and an Order with the document key “users/1”).

Remember the previous post about choosing Voron as our storage engine for RavenDB 4.0? This is one of the reasons why. Because we control the storage layer, we we able to come up with an elegant solution to the problem.

Each of our collections is going to be stored in a separate physical structure, with its own location on disk, its own indexes, etc. But at the same time, all of those separate collections are also going to share a pair of indexes (document key and document etag). In this manner, we effectively index the document etag twice. At first it is indexed in the global index, along all the documents in the database, regardless of which collection they are on. This index will be used for replication, exports, etc. And it is indexed again in a per collection index, which is what we’ll use for indexing, patch by colelction, etc. In the same manner, the documents key index is going to allow us to lookup documents by their key without needing to know what collection they are on.

Remember, those indexes actually store the an id that gives us an O(1) access to the data, which means that processing them is going to be incredibly cheap.

This has a bunch of additional advantages. To start with, it means that we can drop the Raven/DocumentsByEntityName index ,it is not longer required, since all its functioned are now handled by those internal storage indexes.

Loaded terminology term: Indexes in RavenDB can refer to either the indexes that users define and are familiar with (such as the good ol` Raven/DocuemntsByEntityName) and also storage indexes, which are internal structures inside the RavenDB engine and aren’t exposed externally.

That has the nice benefit of making sure that all collection data are now transactional and is updated as part of the write transactions.

So we had to implement a relatively strange internal structure to support this segregation. But aside from the “collections are physically separated from one another”, what does this actually gives us?

Well, it make certain tasks, such as indexing, subscriptions, patching, etc that work on a per collection basis much easier. You don’t need to scan all documents and filter the stuff that isn’t relevant. Instead, you can just iterate over the entire result set directly. And that has its own advantages. Because we are storing documents in separate internal structures per collection, there is a much stronger chance that documents in the same collection will reside nearby one another on the disk. Which is going to increase performance, and opens up some interesting optimization opportunities.

time to read 4 min | 647 words

I don’t like Lucene. It is an external dependency that works in somewhat funny ways, and the version we use is a relatively old one that has been mostly ported as-is from Java. This leads to some design decisions that are questionable (for example, using exceptions for control flow in parsing queries), or just awkward (by default, an error in merging segments will kill your entire process). Getting Lucene to run properly in production takes quite a bit of work and effort. So I don’t like Lucene.

We have spiked various alternatives to Lucene multiple times, but it is a hard problem, and most solutions that we look at lead toward pretty much the same approach that Lucene does it.By now, we have been working with Lucene for over eight years, so we have gotten good in managing it, but there are still quite a bit of code in RavenDB that is decided to managing Lucene’s state, figuring out how to recover in case of errors, etc.

Just off the top of my head, we have code to recover from aborted indexing, background processes that takes regular backups of the indexes, so we’ll be able to restore them in the case of an error, etc. At some point we had a lab of machines that were dedicated to testing that our code was able to manage Lucene properly in the presence of hard resets. We got it working, eventually, but it was hard. And we still get issues from users that into trouble because Lucene can tie itself into knots (for example, a disk full error midway through indexing can corrupt your index and require us to reset it). And that is leaving aside the joy of I/O re-ordering does to you when you need to ensure reliability.

So the problem isn’t with Lucene itself, the problem is that it isn’t reliable. That led us to the Lucene persistence format. While Lucene persistent mode is technically pluggable, in practice, this isn’t really possible. The file format and the way it works are very closely tied to the idea of files. Actually, the idea of process data as a stream of bytes. At some point, we thought that it would be good to implement a Transactional NTFS Lucene Directory, but that idea isn’t really viable, since that is going away.

It was at this point that we realized that we were barking at the entirely wrong tree. We already have the technology in place to make Lucene reliable: Voron!

Voron is a low level storage engine that offers ACID transactions. All we need to do is develop VoronLuceneDirectory, and that should handle the reliability part of the equation. There are a couple of details that needs to be handled, in particular, Voron needs to know, upfront, how much data you want to write, and a single value in Voron is limited to 2GB. But that is fairly easily done. We write to a temporary file from Lucene, until it tells us to commit. At which point we can write it to Voron directly (potentially breaking it to multiple values if needed).

Voila, we have got ourselves a reliable mechanism for storing Lucene’s data. And we can do all of that in a single atomic transaction.

When reading the data, we can skip all of the hard work and file I/O and serve it directly from Voron’s memory map. And having everything inside a single Voron file means that we can skip doing things like the compound file format Lucene is using, and chose a more optimal approach.

And with a reliable way to handle indexing, quite large swaths of code can just go away. We can now safely assume that indexes are consistent, so we don’t need to have a lot of checks on that, startup verifications, recovery modes, online backups, etc.

Improvement by omission indeed.

time to read 6 min | 1134 words

I have written extensively about the blittable format already, so I’ll not get into that again. But what I wanted to do in this post is to discuss the implication of the intersection of two very important features:

  • The blittable format requires no further action to be useful.
  • Voron is based on a memory mapped file concept.

Those two, brought together, are quite interesting.

To see why, let us consider the current state of affairs. In RavenDB 3.0, we store the data as json directly. Whenever we need to read a document, we need to load the document from disk, parse the json, load it into .NET objects, and only then do something with it. When we just got started with RavenDB, it didn’t actually matter to us. Our main concern was I/O, and that dominated all our costs. We spent multiple releases improving on that, and the solution was the prefetcher.

  • Prefetcher will load documents from the disk and make them ready to be indexed.
  • The prefetcher is running concurrently to indexing, so we can parallelize I/O and CPU work.

That allow us to reduce most of the I/O wait times, but it still left us with problems. If two indexes are working, and they each use their own prefetcher, then we have double the I/O cost, double the parsing cost, double the memory cost, double the GC cost. So in order to avoid that, we group indexes together that are roughly at the same space in their indexing. But that lead to a different set of problems, if we have one slow index, that would impact all the other indexes, so we need to have a way to “abandon” an index while it is indexing, to let the other indexes in the group the chance to run.

There is also another issue, when inserting documents into the database, we want to index them, but it seems stupid to take the index, write it to the disk, only to then load them from the disk, parse them, etc. So when we insert a new document, we add it to the prefetcher directly, saving us some work in the common case where indexes are caught up and only need to index new things. That, too, have a cost, it means that the lifetime of such objects tend to be much longer, which means that they are more likely to be pushed into Gen1 or Gen2, so they will not be collected for a while, and when they do, it will be a more expensive collection run.

Oh, and to top it off, all of the structure above need to consider available memory, load on the server, time for indexing batch, I/O rates, liveliness and probably a dozen other factors that don’t pop to mind right now. In short, this is complex.

With RavenDB 4.0, we set out to remove all of this complexity. A large part of the motivation for the blittable format and using Voron are driven by the reasoning below.

If we can get to a point where we can just access the values, and reading documents won’t incur a heavy penalty in CPU/memory, we could radically shift the cost structure. Let us see how. Now, the only cost for indexing is going to be pure I/O, paging the documents to memory when we access them. Actually indexing them is done by merely access the mapped memory directly, so we don’t actually need to allocate much memory during indexing.

Optimizing the actual I/O is pretty easily done by just asking the operating system, we can do that explicitly using PrefetchVirtualMemory or madvise(MADV_WILLNEED), or just let the OS handle that based on actual access pattern. So those are two separate issues that just went away completely. And without needing to spread the cost of loading the documents among all indexes, we no longer have a good reason to go with grouping indexes. So that is out the window, as well as all the complexity that is required to handle a slow index slowing down everyone.

And because newly written documents are likely to be memory resident (they have just been accessed, after all), we can just skip the whole “let us remember recently written documents for the indexes”, because by the time we index them, we are expecting them to still be in memory.

What is interesting here is that by using the right infrastructure we have been able to remove quite a lot of code. Now, the major part here is that being able to remove a lot of code is almost always great, the major change here is that all of the code we removed had to deal with a very large number of factors (if new documents are coming in, but indexing isn’t caught up to them, we need to stop putting the new documents into the perfetcher cache and clear it) that are hard to predict and sometimes interact in funny ways. By moving a lot of that complexity to “let us manage what parts of the file are memory resident”, we can simplify a lot of that complexity and even push much of it directly to the operation system.

This has other implications, because we now no longer need to run indexes in groups, and they can each run and do their own thing, we can now split them so each index has their own dedicated thread. Which mean, in turn, that if we have a very busy index, it is going to be very easy to point which one is the culprit. It also make it much easier for us to handle priorities. Because each index is a thread, it means that we can now rely on the OS prioritization. If you have an index that you really care about running as soon as possible, we can bump its priority higher. And by default, we can very easily mark the indexing thread as lower priority, so we can prioritize answer incoming requests over processing indexes.

Doing it in this manner means that we are able to ask the OS to handle the problem of starvation in the system, where an index doesn’t get to run because it has a lower priority. All of that is already handled in the OS scheduler, so we can lean on that.

Probably the hardest part in the design of RavenDB 4.0 is that we are thinking very hard about how to achieve our goals (and in many cases exceed them) not by writing code, but by not writing code. But by arranging things so the right thing would happen. Architecture and optimization by omission, so to speak.

As a reminder, we have the RavenDB Conference in Texas in a few months, which would be an excellent opportunity to learn about RavenDB 4.0 and the direction in which we are going.

image

FUTURE POSTS

  1. Partial writes, IO_Uring and safety - about one day from now
  2. Configuration values & Escape hatches - 5 days from now
  3. What happens when a sparse file allocation fails? - 7 days from now
  4. NTFS has an emergency stash of disk space - 9 days from now
  5. Challenge: Giving file system developer ulcer - 12 days from now

And 4 more posts are pending...

There are posts all the way to Feb 17, 2025

RECENT SERIES

  1. Challenge (77):
    20 Jan 2025 - What does this code do?
  2. Answer (13):
    22 Jan 2025 - What does this code do?
  3. Production post-mortem (2):
    17 Jan 2025 - Inspecting ourselves to death
  4. Performance discovery (2):
    10 Jan 2025 - IOPS vs. IOPS
View all series

Syndication

Main feed Feed Stats
Comments feed   Comments Feed Stats
}