Ayende @ Rahien

Oren Eini aka Ayende Rahien CEO of Hibernating Rhinos LTD, which develops RavenDB, a NoSQL Open Source Document Database.

Get in touch with me:

oren@ravendb.net

+972 52-548-6969

Posts: 7,173 | Comments: 50,177

Privacy Policy Terms
filter by tags archive
time to read 6 min | 1027 words

A RavenDB customer called us with an interesting issue. Every now and then, RavenDB will stop process any and all requests. These pauses could last for as long as two to three minutes and occurred on a fairly random, if frequent, basis.

A team of anteaters was dispatched to look at the issue (best bug hunters by far), but we couldn’t figure out what was going on. During these pauses, there was absolutely no activity on the machine. There was hardly any CPU utilization, there was no network or high I/o load and RavenDB was not responding to requests, it was also not doing anything else. The logs just… stopped for that duration. That was something super strange.

We have seen similar pauses in the past, I’ll admit. Around 2014 / 2015 we had a spate of issues very similar, with RavenDB paused for a very long time. Those issues were all because of GC issues. At the time, RavenDB would do a lot of allocations and it wasn’t uncommon to end up with the majority of the time spent on GC cleanup. The behavior at those time, however, was very different. We could see high CPU utilization and all metrics very clearly pointed out that the fault was the GC. In this case, absolutely nothing was going on.

Here is what such a pause looked like when we gathered the ETW metrics:

image004

Curiouser and curiouser, as Alice said.

This was a big instance, with quite a bit of work going on, so we spent some time analyzing the process behavior. And absolutely nothing appeared to be wrong. We finally figured out that the root cause is the GC, as you can see here:

image

The problem is that the GC is doing absolutely nothing here. For that matter, we spend an inordinate amount of time making sure that the GC won’t have much to do. I mentioned 2014/2015 earlier, as a direct result of those issues, we have fixed that by completely re-architecting RavenDB. The database uses a lot less managed memory in general and is far faster. So what the hell is going on here? And why weren’t we able to see those metrics before? It took a lot of time to find this issue, and GC is one of the first things we check.

In order to explain the issue, I would like to refer you to the Book of the Runtime and the discussion of threads suspension. The .NET GC will eventually need to run a blocking collection, when that happens, it needs to ensure that the heap is in a known state. You can read the details in the book, but the short of it is that there are what is known as GC Safe Points. If the GC needs to run a blocking collection, all managed threads must be as a safe point. What happens if they aren’t, however? Well, the GC will let them run until they reach such a point. There is a whole machinery in place to make sure that this happens. I would also recommend reading the discussion here. And Konard’s book is a great resource as well.

Coming back to the real issue, the GC cannot start until all the managed threads are at a safe point, so in order to suspend the threads, it will let them run to a safe point and suspend them there. What is a safe point, it is a bit complex, but the easiest way to think about it is that whenever there is a method call, the runtime ensures that the GC would have stable information. The distance between method calls is typically short, so that is great. The GC is not likely to wait for long for the thread to come to a safe point. And if there are loops that may take a while, the JIT will do the right thing to ensure that we won’t wait too long.

In this scenario, however, that was very much not the case. What is going on?

image

We got a page fault, which can happen anywhere, and until we return from the page fault, we cannot get to the GC Safe Point, so all the threads are suspended waiting for this page fault to complete.

And in this particular case, we had a single page fault, reading 16KB of data, that took close to two minutes to complete.

image

So the actual fault is somewhere in storage, which is out of scope for RavenDB, but a single slow write had a cascading effect to pause the whole server. The investigation continues and I’ll probably have another post on the topic when I get the details.

For what it is worth, this is a “managed language” issue, but a similar scenario can happen when we are running in native code. A page fault while holding the malloc lock would soon have the same scenario (although I think that this would be easier to figure out).

I wanted to see if I can reproduce the same issue on my side, but run into a problem. We don’t know what caused the slow I/O, and there is no easy way to do it in Windows. On the other hand, Linux has userfaultfd(), so I decided to use that.

The userfaultfd() doesn’t have a managed API, so I wrote something that should suffice for my (very limited) needs. With that, I can write the following code:

If you’ll run this with: dotnet run –c release on a Linux system, you’ll get the following output:

139826051907584 about to access
Got page fault at 139826051907584
Calling GC...

And that would be… it. The system is hang. This confirms the theory, and is quite an interesting use of the Linux features to debug a problem that happened on Windows.

time to read 10 min | 1949 words

For a database engine, controlling the amount of work that is being executed is a very important factor for the stability of the system. If we can’t control the work that is being done, it is quite easy to get to the point where we are overwhelmed.

Consider the case of a(n almost) pure computational task, which is completely CPU bound. In some cases, those tasks can easily be parallelized. Here is one such scenario:

This example seems pretty obvious, right? This is complete CPU bound (sorting), and leaving aside that sort itself can be done in parallel, we have many arrays that we need to sort. As you can imagine, I would much rather this task to be done as quickly as possible.

One way to make this happen is to parallelize the work. Here is a pretty simple way to do that:

Parallel.ForEach(arrayOfArrays, array => Array.Sort(array));

A single one liner and we are going to see much better performance from the system, right?

Yep, this particular scenario will run faster, depending on the sizes, that may be much faster. However, that single one liner is a nasty surprise for the system stability as a whole. What’s the issue?

Under the covers, this is going to use the .NET thread pool to do the work. In other words, this is going to be added to the global workload on the system. What else is using the same thread pool? Well, pretty much everything. For example, processing requests in Kestrel is done in the thread pool, all the async machinery uses the thread pool under the covers as well as pretty much everything else.

What is the impact of adding a heavily CPU bounded work to the thread pool, one may ask? Well, the thread pool would start executing these on its threads. This is heavy CPU work, so likely will run for a while, displacing other work. If we consider a single instance of this code, there is going to be a limit of the number of concurrent work that is placed in the thread pool. However, if we consider whatever we run the code above in parallel… we are going to be placing a lot of work on the thread pool. That is going to effectively starve the rest of the system. The thread pool will react by spawning more threads, but this is a slow process, and it is easy to get into a situation where all available threads are busy, leaving nothing for the rest of the application to run.

From the outside, it looks like a 100% CPU status, with the system being entirely frozen. That isn’t actually what is going on, we are simply holding up all the threads and can’t prioritize the work between request handling (important) and speeding up background work (less important). The other problem is that you may be running the operation in an otherwise idle system, and the non parallel code will utilize a single thread out of the many that are available.

In the context of RavenDB, we are talking here about indexing work. It turns out that there is a lot of work here that is purely computational. Analyzing and breaking apart text for full text search, sorting terms for more efficient access patterns, etc. The same problem above remains, how can we balance the indexing speed and the overall workload on the system?

Basically, we have three scenarios that we need to consider:

  1. Busy processing requests – background work should be done in whatever free time the system has (avoiding starvation), with as little impact as possible.
  2. Busy working on many background tasks – concurrent background tasks should not compete with one another and step on each other’s toes.
  3. Busy working on a single large background task – which should utilize as much of the available computing resources that we have.

For the second and third options, we need to take into account that the fact that there isn’t any current request processing work doesn’t matter if there is incoming work. In that case, the system should prioritize the request processing over background work.

Another important factor that I haven’t mentioned is that it would be really good for us to be able to tell what work is taking the CPU time. If we are running a set of tasks on multiple threads, it would be great to be able to see what tasks they are running in a debugger / profiler.

This sounds very much like a class in operating systems, in fact, scheduling work is a core work for an operating system. The question we have here, how do we manage to build a system that would meet all the above requirements, given that we can’t actually schedule CPU work directly.

We cannot use the default thread pool, because there are too many existing users there that can interfere with what we want. For that matter, we don’t actually want to have a dynamic thread pool. We want to maximize the amount of work we do for computational heavy workload. Instead, for the entire process, we will define a set of threads to handle work offloading, like this:

This creates a set of threads, one for each CPU core on the machine. It is important to note that these threads are running with the lowest priority, if there is anything else that is ready to run, it will get a priority. In order to do some background work, such as indexing, we’ll use the following mechanism:

Because indexing is a background operation in RavenDB, we do that in a background thread and we set the priority to below normal. Request process threads are running at normal priority, which means that we can rely on the operating system to run them first and at the same time, the starvation prevention that already exists in the OS scheduler will run our indexing code even if there is extreme load on the system.

So far, so good, but what about those offload workers? We need a way to pass work from the indexing to the offload workers, this is done in the following manner:

Note that the _globalWorkQueue is process wide. For now, I’m using the simple sort an array example because it make things easier, the real code would need a bit more complexity, but it is not important to explain the concept. The global queue contains queues for each task that needs to be run.

The index itself will have a queue of work that it needs to complete. Whenever it needs to start doing the work, it will add that to its own queue and try to publish to the global queue. Note that the size of the global queue is limited, so we won’t use too much memory there.

Once we published the work we want to do, the indexing thread will start working on the work itself, trying to solicit the aim of the other workers occasionally. Finally, once the indexing thread is done process all the remaining work, we need to wait for any pending work that is currently being executed by the workers. That done, we can use the results.

The workers all run very similar code:

In other words, they pull a queue of tasks from the global tasks queue and start working on that exclusively. Once they are done processing a single index queue to completion, the offload worker will try pick another from the global queue, etc.

The whole code is small and fairly simple, but there are a lot of behavior that is hidden here. Let me try to explore all of that now.

The indexing background work push all the work items to its local queue, and it will register the queue itself in the global queue for the offloading threads to process. The indexing queue may be registered multiple times, so multiple offloading threads will take part in this. The indexing code, however, does not rely on that and will also process its own queue. The idea is that if there are offloading threads available, they will help, but we do not rely on them.

The offloading thread, for its part, will grab an indexing thread queue and start processing all the items from the queue until it is done. For sorting arbitrary arrays, it doesn’t matter much, but for other workloads, we’ll likely get much better locality in terms of task execution by issuing the same operation over a large set of data.

The threads priority here is also important, mind. If there is nothing to do, the OS will schedule the offloading threads and give them work to do. If there is a lot of other things happening, it will not be scheduled often. This is fine, we are being assisted by the offloading threads, they aren’t mandatory.

Let’s consider the previous scenarios in light of this architecture and its impact.

If there are a lot of requests, the OS is going to mostly schedule the request processing threads, the indexing threads will also run, but it is mostly going to be when there is nothing else to do. The offload threads are going to get their chance, but mostly that will not have any major impact. That is fine, we want most of the processing power to be on the request processing.

If there is a lot of work, on the other hand, the situation is different. Let’s say that there are 25 indexes running and there are 16 cores available for the machine. In this case, the indexes are going to compete with one another. The offloading threads again not going to get a lot of chance to run, because there isn’t anything that adding more threads in this context will do. There is already competition between the indexing threads on the CPU resources. However, the offloading threads are going to be of some help. Indexing isn’t pure computation, there are enough cases where it needs to do I/O, in which case, the CPU core is available. The offloading threads can then take advantage of the free cores (if there aren’t any other indexing threads that are ready to run instead).

It is in the case that there is just a single task running on a mostly idle system that this really shines. The index is going to submit all its work to the global queue, at which point, the offloading threads will kick in and help it to complete the task much sooner than it would be able to other wise.

There are other things that we need to take into account in this system:

  • It is, by design, not fair. An index that is able to put its queue into the global queue first may have all the offloading threads busy working on its behalf. All the other threads are on their own. That is fine, when that index will complete all the work, the offloading threads will assist another index. We care about overall throughput, not fairness between indexes.
  • There is an issue with this design under certain loads. An offloading thread may be busy doing work on behalf of an index. That index completed all other work and is now waiting for the last item to be processed. However, the offloading thread has the lower priority, so will rarely get to run. I don’t think we’ll see a lot of costs here, to be honest, that requires the system to be at full capacity (rare, and admins usually hate that, so prevent it) to actually be a real issue for a long time. If needed, we can play with thread priorities dynamically, but I would rather avoid it.

This isn’t something that we have implemented, rather something that we are currently playing with. I would love to hear your feedback on this design.

time to read 3 min | 509 words

Ransomware, Cyber, Crime, Attack, Malware, HackerNewsBlur (a personal news aggregator) suffered from a data breech / ransomware “attack”. I’m using the term “attack” here in quotes because this is the equivalent to having your car broken into after you left it with the engine running with the keys inside in a bad part of town.

As a result of the breech, users’ data including personal RSS feeds, access tokens for social media, email addresses and other sundry items of various import. It looks like about 250GB of data was taken hostage, by the way.

The explanation about what exactly happened is really interesting, however.

NewsBlur moved their MongoDB instance from its own server to a container. Along the way, they accidently (looks like a Docker default configuration) opened the MongoDB port to the whole wide world. By default, MongoDB will only listen to the localhost, in this case, I think that from the perspective of MongoDB, it was listening to the local port, it is Docker infrastructure that did the port forwarding and tied the public port to the instance. From that point on, it was just a matter of time. It apparently took two hours or so for some automated script to run into the welcome mat and jump in, wreak havoc and move on.

I’m actually surprised that it took so long. In some cases, machines are attacked in under a minute from showing up on the public internet. I used the term bad part of town earlier, but it is more accurate to say that the entire internet is a hostile environment and should be threated as such.

That lead to the next problem. You should never assume that you are running in anywhere else. In the case above, we have NewsBlur assuming that they are running on a private network where only the internal servers can access. About a year ago, Microsoft had a similar issue, they exposed an Elastic cluster that was supposed to be on an internal network only and lost 250 million customer support records.

In both cases, the problem was lack of defense in depth. Once the attacker was able to connect to the system, it was game over. There are monitoring solutions that you can use, but in general, the idea is that you don’t trust your network. You authenticate and encrypt all the traffic, regardless of where you are running it. The additional encryption cost is not usually meaningful for typical workloads (even for demanding workloads), given that most CPUs have dedicated encryption instructions.

When using RavenDB, we have taken the steps to ensure that:

  • It is simple and easy to run in a secure mode, using X509 client certificate for authentication and all network communications are encrypted.
  • It is hard and complex to run without security.

If you run the RavenDB setup wizard, it takes under two minutes to end up with a secured solution, one that you can expose to the outside world and not worry about your data taking a walk.

time to read 4 min | 714 words

imageA RavenDB database can reside on multiple nodes in the cluster. RavenDB uses a multi master protocol to handle writes. Any node holding the database will be able to accept writes. This is in contrast to other databases that use the leader/follower model. In such systems, only a single instance is able to accept writes at any given point in time.

The node that accepted the write is responsible for disseminating the write to the rest of the cluster. This should work even if there are some breaks in communication, mind, which makes things more interesting.

Consider the case of a write to node A. Node A will accept the write and then replicate that as part of its normal operations to the other nodes in the cluster.

In other words, we’ll have:

  • A –> B
  • A –> C
  • A –> D
  • A –> E

In a distributed system, we need to be prepare for all sort of strange failures. Consider the case where node A cannot talk to node C, but the other nodes can. In this scenario, we still expect node C to have the full data. Even if node A cannot send the data to it directly.

The simple solution would be to simply have each node replicate the data it get from any source to all its siblings. However, consider the cost involved?

  • Write to node A (1KB document) will result in 4 replication (4KB)
  • Node B will replicate to 4 nodes (including A, mind), so that it another 4KB.
  • Node C will replicate to 4 nodes, so that it another 4KB.
  • Node D will replicate to 4 nodes, so that it another 4KB.
  • Node E will replicate to 4 nodes, so that it another 4KB.

In other words, in a 5 nodes cluster, a single 1KB write will generate 20KB of network traffic, the vast majority of it totally unnecessary.

There are many gossip algorithms, and they are quite interesting, but they are usually not meant for a continuous stream of updates. They are focus on robustness over efficiency.

RavenDB takes the following approach, when a node accept a write from a client directly, it will send the new write to all its siblings immediately. However, if a node accept a write from replication, the situation is different. We assume that the node that replicate the document to us will also replicate the document to other nodes in the cluster. As such, we’ll not initiate replication immediately. What we’ll do, instead, it let all the nodes that replicate to us, that we got the new document.

If we don’t have any writes on the node, we’ll check every 15 seconds whatever we have documents that aren’t present on our siblings. Remember that the siblings will report to us what documents they currently have, proactively. There is no need to chat over the network about that.

In other words, during normal operations, what we’ll have is node A replicating the document to all the other nodes. They’ll each inform the other nodes that they have this document and nothing further needs to be done. However, in the case of a break between node A and node C, the other nodes will realize that they have a document that isn’t on node C, in which case they’ll complete the cycle and send it to node C, healing the gap in the network.

I’m using the term “tell the other nodes what documents we have”, but that isn’t what is actually going on. We use change vectors to track the replication state across the cluster. We don’t need to send each individual document write to the other side, instead, we can send a single change vector (a short string) that will tell the other side all the documents that we have in one shot.  You can read more about change vectors here.

In short, the behavior on the part of the node is simple:

  • On document write to the node, replicate the document to all siblings immediately.
  • On document replication, notify all siblings about the new change vector.
  • Every 15 seconds, replicate to siblings the documents that they missed.

Just these rules allow us to have a sophisticated system in practice, because we’ll not have excessive writes over the network but we’ll bypass any errors in the network layer without issue.

time to read 5 min | 993 words

I mentioned that I’m teaching a Cloud Computing course at university in a previous post. That lead to some good questions that I have to field about established wisdom that I have to really think about. One such question that I run into was about the intersection of databases and the cloud.

One of the most important factors for database performance is the I/O rate that you can get. Let’s take a fairly typical off the shelf drive, shall we?

Cost of the drive is less than 500 $ US for a 2TB disk and it can write at close 5GB / sec with sustained writes sitting at 3GB /sec at  User Benchmark, it is also rated to hit 1 million IOPS. That is a lot. And that is when you spend less than 500$ on that.

On the other hand, a comparable drive would be Azure P40, which cost 235.52$ per month for 2TB of disk space. It also offers a stunning rate of 7,500 IOPS (with bursts of 30,000!). The write rate is 250MB/sec with bursts of 1GB/sec. The best you can get on Azure, though, is an Ultra disk. Where a comparable disk to the on premise option would cost you literally thousands per month (and would be about a tenth of the performance).

In other words, the cloud option is drastically more costly. To be fair, we aren’t comparing the same thing at all. A cloud disk is more than just renting of the hardware. There is redundancy to consider, the ability to “move” the disk between instances, the ability to take snapshots and restore, etc.

A more comparable scenario would be to look at NVMe instances. If we’ll take L8sv2 instance on Azure, that gives us a 2TB NVMe drive with 400,000 IOPS and 2GB/sec throughput. That is at least within reach of the off the shelf disk I pointed out before. The cost? About 500$ per month. But now we are talking about a machine that has 8 cores and 64 GB of RAM.

The downside of NVMe instances is that the disk are transient. If there is a failure that requires us to stop and start the machine (basically, moving hosts), that would mean that the data is lost. You can reboot the machine, but not stop the cloud allocation of the machine without losing the data.

The physical hardware option is much cheaper, it seems. If we add everything around the disk, we are going to get somewhat different costs. I found a similar server to L8sv2 on Dell for about 7,000 $ US, for example. Pretty sure that you can get it for less if you know what you are doing, but it was my first try and it included 3.2 TB of enterprise grade NVMe drives.

Colocation pricing can run about 100$ a month (again, first search result, you can get for less) and that means that the total monthly cost is roughly 685$. That is comparable to the cloud, actually, but doesn’t account for the fact that you can use the same server for much longer than a single year. It is also probably wasting a lot of money on bigger hardware. What you don’t get, which you probably want, is the envelope around that. The ability to say: “I want another server” (or ten), the ability to move and manage your resources easily, etc. And that is as long as you are managing just hardware resources.

You don’t get any of the services or the expertise in running things. Given that even professional organizations can suffer devastating issues, you want to have an expert manage than, because an armature handling that topic lead to problems. 

A lot of the attraction of the cloud comes from a very simple reason. I don’t want to deal with all of that stuff. None of that is your competitive advantage and you would rather just pay and not think about that. The key for the success of the cloud is that globally, you are paying less (in time, effort and manpower) than taking the cost of managing it yourself.

There are two counterpoints here, though.

  • At some scale, it would make sense to move out from the cloud to your own hardware. Dropbox did that at some point, moving some of its infrastructure off the cloud to savings of over 75 million dollars. You don’t have to be at Dropbox size to benefit from having some of your own servers, but you do need to hit some tipping point before that would make sense.
  • StackOverflow is famously running on their own hardware, and is able to get great results out of that. I wonder how much the age of StackOveflow has to do with that, though.

The cloud is a pretty good abstraction, but it isn’t one that you get for free. There are a lot of scenarios where it makes a lot of sense to have some portions of your system outside of the cloud. The default of “everything is in the cloud”, however, make a lot of sense. Specifically because you don’t need to do complex (and costly) sizing computations. Once you have the system running and the load figured out, you can decide if it make sense to move things to your own severs.

And, of course, this all assumes that we are talking about just the hardware. That is far from the case in today’s cloud. Cloud services are another really important aspect of what you get in the cloud. Consider the complexity of running a  Kubernetes cluster, or setting up a system for machine vision or distributed storage or any of the things that the cloud providers has commoditized.

The decision of cloud usage is no longer a simple buy vs. rent but a much more complex one about where do you draw the line of what should be your core concerns and what should be handled outside of your purview.

time to read 2 min | 325 words

Yesterday I asked about dealing with livelihood detection of nodes running in AWS. The key aspect is that this need to be simple to build and easy to explain.

Here are a couple of ways that I came up with, nothing ground breaking, but they do the work while letting someone else do all the heavy lifting.

Have a well known S3 bucket that each of the nodes will write an entry to. The idea is that we’ll have something like (filename –  value):

  • i-04e8d25534f59e930 – 2021-06-11T22:01:02
  • i-05714ffce6c1f64ad – 2021-06-11T22:00:49

The idea is that each node will scan the bucket and read through each of the files, getting the last seen time for all the nodes. We’ll consider all the nodes whose timestamp is within the last 1 minute to be alive and any other node is dead.  Of course, we’ll also need to update the node’s file on S3 every 30 seconds to ensure that other nodes know that we are alive.

The advantage here is that this is trivial to explain and implement and it can work quite well in practice.

The other option is to actually piggy back on top of the infrastructure that is dedicated for this sort of scenario. Create an elastic load balancer and setup a target group. On startup, the node will register itself to the target group and setup the health check endpoint. From this point on, each node can ask the target group to find all the healthy nodes.

This is pretty simple as well, although it requires significantly more setup. The advantage here is that we can detect more failure modes (a node that is up, but firewalled away, for example).

Other options, such as having the nodes ping each other, are actually quite complex since they need to find each other. That lead to some level of service locator, but then you’ll have to avoid each node pining all the other nodes, since that can get busy on the network.

time to read 2 min | 286 words

I’m teaching a course at university about cloud computing. That can be a lot of fun, but quite frustrating at time. The key issue for me is that I occasionally need to provide students with some way to do something that I know how to do properly, but I can’t.

Case in point, assuming that I have a distributed cluster of nodes, and we need to detect what nodes are up or down, how do you do that?

With RavenDB, we assign an observer to the cluster whose job is to do health monitoring. I can explain that to the students, but I can’t expect them to utilize this technique in their exercises, there is too much detail there. The focus of the lesson or exercise is not to build a distributed system but to make use of one, after all.

As a rule, I try to ensure that all projects that we are working on can be done in under 200 lines of Python code. That puts a hard limit to the amount of behavior I can express. Because of that, I find myself looking for ways to rely on existing infrastructure to deal with the situation. 

Each node is running the same code, and they are setup so they can talk to one another, if needed. It is important that all the live nodes will converge to agree on the active nodes in relatively short order.

The task is to find the list of active nodes in a cluster, where nodes may go up or down dynamically. We are running in AWS cloud so you can use its resources, how would you do that?

The situation should be as simple as possible and easy to explain to students.

time to read 5 min | 885 words

In the database field and information retrieval in general, there is a very common scenario. I have a list of (sorted) integers that I want to store, and I want to do that in an as efficient a manner as possible. There are dozens of methods to do this and this is a hot topic for research. This is so useful because there are so many places where you can operate on a sorted integer list and gain massive benefits. Unlike generic compression routines, we can usually take advantage of the fact that we understand the data we are trying to work with and get better results.

The reason I need to compress integers (actually, int64 values) is that I’m trying to keep track of matches for some data, so the integers that I’m tracking are actually file offsets for user’s data inside of Voron. That lead to a few different scenarios that I have to deal with:

  • There is a single result
  • There is a reasonable number of results
  • There is a boatload of results

I’m trying to figure out what is the best way to store the later two options in as efficient manner as possible.

The first stop was Daniel Lemire’s blog, naturally, since he has wrote about this extensively. I looked at the following schemes: FastPFor and StreamVByte. They have somewhat different purposes, but basically, FastPFor is using a bits stream while StreamVByte is using byte oriented mode. Theoretically speaking, you can get better compression rate from FastPFor, but StreamVByte is supposed to be faster. Another integer compression system come from the Gorilla paper from Facebook, that is a bigger scheme, which include time series values compression. However, part of that scheme talks about how you can compress integers (they use that to store the ticks of a particular operation). We are actually using that for the time series support inside of RavenDB.

I’m not going to cover that in depth, here is the paper on Gorilla compression, the relevant description is at section 4.1.1. Suffice to say that they are using a bit stream and delta of deltas computation. Basically, if you keep getting values that are the same distance apart, you don’t need to record all the value, you can compute that naturally. In the best case scenario, Gorilla compression needs a single bit per value, assuming the results are spaced similarly.

For my purpose, I want to get as high a compression rate as possible, and I need to store just the list of integers. The problem with Gorilla compression is that if we aren’t getting numbers that are the same distance apart, we need to record the amount that they are different. That means that at a minimum, we’ll need a minimum of 9 bits per value. That adds up quickly, sadly.

On the other hand, with PFor, there is a different system. PFor computes the maximum number of bits required for a batch of integer, and then record just those values. I found the Binary Packing section (2.6) in this paper to be the clearest explanation on how that works exactly.  The problem with PFor, however, is that if you have a single large outlier, you’ll waste a lot of bits unnecessarily.

I decided to see if I can do something about that and created an encoder that works on batches of 128 integers at a time. This encoder will:

  • Check the maximum number of bits required to record the deltas of the integers. That along already saves us a lot.
  • Then we check the top and bottom halves of the batch, to see if we’ll get a benefit from recording them separately. A single large value (or a group of them) that is localized to a part of the batch will be recorded independently in this case.
  • Finally, instead of only recording the meaningful bit ranges, we’ll also analyze the batch we get further. The idea is to try to find ranges within the batch that have the same distance from one another. We can encode those as repetitions instead of each independent value. That can end up saving a surprisingly amount of space.

You can look at the results of my research here. I’ll caution you that this is raw, but the results are promising. I’m able to beat (in terms of compression rate) the standard PFor implementation by a bit, with a lot less code.

I’m also looking at a compression rate of 30% – 40% for realistic data sets. And if the data is setup just right and I’m able to take advantage of the repeated delta optimization, I can pack things real tight.

Currently numbers say that I can push upward of 10,000 int64 values in an 8KB buffer without any repeated deltas. It goes to just under 500,000 int64 values in an 8KB buffer if I can take full advantage of the deltas.

The reason I mention the delta so often, it is very likely that I’ll record values that are roughly the same size, so we’ll get offsets that are the same space from one another. In that case, my encoder goes to town and the compression rate is basically crazy.

This is a small piece of a much larger work, but this is the first time in a while that I got to code at Voron’s level. This is fun.

time to read 3 min | 438 words

RavenDB has been using the Raft protocol for the past years. In fact, we have written three or four different implementations of Raft along the way. I implemented Raft using pure message passing, on top of async RPC and on top of TCP. I did that using actor model and using direct parallel programming as well as the usual spaghettis mode.

The Raft paper is beautiful in how it explain a non trivial problem in a way that is easy to grok, but it is also something that can require dealing with a number of subtleties. I want to discuss some of the ways to successfully implement it. Note that I’m assuming that you are familiar with Raft, so I won’t explain anything here.

A key problem with Raft implementations is that you have multiple concurrent things happening all at once, on different machines. And you always have the election timer waiting in the background. In order to deal with that, I divide the system into independent threads that each has their own task.

I’m going to talk specifically about the leader mode, which is the most complex aspect, usually. In this mode, we have:

  • Leader thread – responsible for determining the current progress in the cluster.
  • Follower thread – once per follower – responsible for communicating with a particular follower.

In addition, we may have values being appended to our log concurrently to all of the above. The key here is that the followers threads will communicate with their follower and push data to it. The overall structure for a follower thread looks like this:

What is the idea? We have a dedicated thread that will communicate with the follower. It will either ping the follower with an empty AppendEntries (every 1/3 of the election timeout) or it will send a batch of up to 50 entries to update the follower. Note that there is nothing in this code about the machinery of Raft, that isn’t the responsibility of the follower thread. The leader, on the other hand, listen to the notifications from the followers threads, like so:

The idea is that each aspect of the system is running independently, and the only communication that they have with each other is the fact that they can signal the other that they did some work. We then can compute whatever that work changed the state of the system.

Note that the code here is merely drafts, missing many details. For example, we aren’t sending the last commit index on AppendEntries, and committing the log is an asynchronous operation, since it can take a long time and we need to keep the system in operation.

time to read 2 min | 316 words

I run into this question on Hacker News, asking for the best computer science papers. There are a few that I keep getting back to, either because they are so fundamental or they are so useful.

Without any particular order

  • The Raft Paper – a distributed consensus algorithm that made sense to me on first read. There are a lot of subtle issues to consider, but when reading the paper, everything clicked. That is head and shoulders above what Paxos literature is about.
  • The Ubiquitous BTree – talk about a paper that I used daily. Admittedly, I didn’t get started on BTrees from this paper, but this is a very well written one and it does a great job presenting the topic. It is also from 1979, and BTree were already “ubiquitous” at that time, which tells us something.
  • Extendible Hashing – this is also from 1979, and it is well written. I implemented extendible hashing based on this article directly and I grokked it right away.
  • How Complex Systems Fail – not strictly a computer science paper. In fact, I’m fairly certain that this fits more into civil engineering, but it does an amazing job of explaining the internals of complex systems and the why and how they fail. I took a lot from this paper. It is also very short and highly readable.
  • OLTP Through the Looking Glass – discuss the internal structure of database engines and the cost and complexities of their various pieces.
  • You’re doing it wrong – discuss the implementation of Varnish proxy from the point of view of a kernel hacker. Totally different approach to the design of the system. Had a lot of influence on how I build systems.

I’m fairly certain that my criteria won’t be yours, but those are all papers that I have read multiple times and have utilized their insights in my daily work.

FUTURE POSTS

No future posts left, oh my!

RECENT SERIES

  1. Postmortem (2):
    23 Jul 2021 - Accidentally quadratic indexing output
  2. re (28):
    23 Jun 2021 - The performance regression odyssey
  3. Challenge (58):
    16 Jun 2021 - Detecting livelihood in a distributed cluster
  4. Webinar (4):
    11 Jun 2021 - Machine Learning and Time Series in RavenDB with Live Examples
  5. Webinar recording (13):
    24 May 2021 - The Rewards of Escaping the Relational Mindset
View all series

Syndication

Main feed Feed Stats
Comments feed   Comments Feed Stats