Optimizing local and distributed transactions with batching
I got into a good discussion about how RavenDB implements some optimizations with transaction handling. The details got big enough (and hopefully interesting enough) that they warrant their own post.
When we are talking about transactions, one of the key factors in the cost of a transaction is the amount of time that it takes to persist that. This is different for local and distributed transactions.
For a local transaction, we can consider the transaction committed if it is durably stored on the disk.
For a distributed transaction, we can consider the transaction committed if it is durably stored on a majority of the disks in the cluster.
That factor right there is the primary reason why a distributed transaction is more expensive. For a local transaction, you are waiting for the disk. For a distributed transaction, you are waiting for multiple disks and the network.
One of the core optimizations for speeding up transactions is the ability to batch things. The cost of writing to the disk is more or less the same, regardless of how much you write (within an order of magnitude or so). In other words, writing 8 KB and writing 32 KB has pretty much the same cost. Writing 1 MB and writing 100 MB does not, but writing 1 MB vs 4 MB isn’t meaningfully different (sequential durable write, which is what we care for in the case of transactions).
The point of this post is how this is actually handled. RavenDB utilizes a process called transaction merging to reduce the number of times that we have to go to the disk. Concurrent transactions will be bundled into the same write call, massively increasing our throughput. To give you some context, without transaction merging, you can peak at a few hundreds transactions per second. With transaction merging, you can jump to high thousands of transactions per second. Here is how this works:
RavenDB actually takes this further, in addition to transaction merging, we also apply something we call async commit. Take a look at the following timeline:
A transaction is actually composed of two separate steps. First we need to execute whatever commands we have in the transaction, then we have to write the transaction changes to disk.
RavenDB is able to start processing the next transaction as soon as the previous one started the write to the disk. The idea is to parallelize compute and I/O, and we are able to benefit greatly as a result. Note that this is safe to do, since the next transaction won’t be committed until the prior transaction has been durably stored.
How does this work in practice? Whenever we have a new transaction, we add it to a queue. A dedicated thread will merge those transactions and pull them from the queue, running the separate transactions as one big operation. When we run out of pending transactions or hit certain size / timing limits, we’ll commit the merged transaction and start working on the next one while the commit is completing in the background.
There are certain algorithms that try to maximize throughput, such as Nagle. They do that by waiting for additional transactions to arrive before actually going to the disk. RavenDB doesn’t use that approach. If a system is idle and we get a single transaction, we’ll immediately execute and commit it.
But the fact that we don’t explicitly do Nagle doesn’t mean that it isn’t useful. Because we have to wait for the disk, what ends up happening is that under load, we start getting more pending transactions in the queue. Which will then be executed as a merged unit. In other words, RavenDB implements a dynamic batching approach, affected by the actual I/O constraints and the load on the system. If we have independent transactions, we’ll execute them immediately. As the load increases, we’ll start making more merged transactions. This way we can keep a fairly consistent response time even when the load of the system grows by leaps and bounds.
The situation is similar when we are talking about distributed transactions. RavenDB uses the Raft protocol for managing its distributed behavior. I’m going to focus just on the batching aspect of the behavior. RavenDB will send an AppendEntries message to the other members in the cluster every 200 ms or so. However, if we have a new command to send to the cluster, it will go out immediately over the network. An important factor here is that we are using TCP, and we require acknowledgment from the other side before we send the next message. As a result of those behaviors, we have pretty much the same situation. Depending on the network latency and the processing time, we’ll send more entries in a single roundtrip.
In short, the overall behavior for RavenDB is that we’ll start the operation immediately on the first action (both for disk and network), and then we’ll batch anything that happens while the first operation is in flight and send that as a result.
After over a decade of working in this manner, I can tell that this has proven to be a highly adaptable system that results in the minimum number of knobs to mess with. It favors latency over throughput when there isn’t a lot of activity and shifts toward favoring throughput over latency as the load grows.
Comments
Great benefits of simple observation that you can combine multiple transactions into one and still get the same result. But does it mean that your isolation level is serializable?
From my understanding of ayende's blog, if explain in different way, it would be following.
Network side: DTO (Data Transfer Object). This is achieved through
session.SaveChanges()
. Which RavenDB client will combine non conflict changes to database in one HTTP call. Similar to MSFT's batch request.Local IO: This is similar to SSE (Server Sent Event). Where RavenDB need queue to process changes to disc in sequencial manner. The request from network is concurrent. The goal is write to disk right away if no queue, then buffer and batch process if there is queue.
To achieve single write from concurrent source, we could use
ActionBlock
and setup max concurrency to 1 and with maximum buffer. Such approach unable to batch. We could of course use batch block fromDataFlow
library, but such batch operation require fixed size. So it is not exactly fit.Without .Net Core 3 we could use
ConcurrentQueue
plusTaskCompletionSource
.With latest .Net. you can use ChannelReader, it can achieve similar outcome.
Rafal,
Yes, we are technically serializable. You can run a script that would run with that level and have complete freedom / safety in there.
However, typical usage is read & write happening on separate transactions (you read the documents, mutate them and then save). Those happen in a different request, and we don't keep the transaction open between them.
Jason,
Yes, what you are describing is very similar, however, note that the
WriteToBatch
will proceed in Parallel to the work.We have something like:
Here is another version of that: https://ayende.com/blog/176257/10x-speedup-utilizing-nagle-algorithm-in-business-application
The question about isolation level begs another one - is there any interference possible between separate transactions caused by the fact that they have been merged? For example, some dirty reads - seeing changes made by another transaction in a batch, which shouldn't be visible because that transaction hasn't committed yet? If reads and updates are in separate transactions then probably not an issue, but is it even possible?
..or maybe it doesn matter at all because order of transactions in a batch is not a part of the contract, and in the end either all will be committed or nothing - so no dirty reads, just reading of data that would be there anyway if transactions were executed one by one.
Rafal,
No, they are running independently one after another, it is possible for you to see uncommitted changes, yes, but from the transaction perspective, it doesn't matter. We are seeing the uncommitted changes that will be committed by the overall transaction. If there is an error, we'll retry the transaction independently.
Rafal, This is correct, there is no guarantee on orders of transactions in the batch. And the final is all or nothing commit.
Jason,
If you're not familiar already, you may want to check out CosmosDB's Bulk support (source here), which implements a similar batching system with the Timer Wheel algorithm. (It also uses task continuations so that even though requests are posted as a batch, each request can be awaited individually - a trick I haven't seen elsewhere yet.)
Avi,
That is an interesting use case. The problem here is that this requires additional allocations on the client side. I think that a dedicated API / code path would be nicer overall from a UX perspective, but I agree that the technical details are pretty cool
Only see Avi's note today, don't think he will notice my reply.
I also think it sounds cool but it is not ideal design. It is something for in-experienced developer but less flexible and low performance. I know where they came from, since I have fixed people's code in EF, where an operation could take hour to process in SQL, where if use bulk action of EF, it can easily shrink down to few min. Without change the whole architecture. Average developer have no idea of bulk concept and they don't care. Probably because the environment I am at is the case, but for Microsoft, majority of their user base is like that. So make it fool approve rather than performance is the way to go. Even with such implementation, I doubt many will use that flag.
The reason I like RavenDB's batch approach or Graph API's batch approach, is because it is explicit. You setup work and you explicitly tell it to execute. In Zen of Python, quote.
With explicit, you can control when to execute and how to execute. On the other hand, RavenDB and Graph API's explicit approach also means performance. You set up tasks, and you call execute, no time need to be wait. No performance penalty. The execution is in
ns
. Of course I never measured, but one thing to guarantee, when you callSaveChanges
in RavenDB client, orSendBulk
in GraphApi, it execute immediately.When create and modify items, in most of cases, you want it to be as quick as possible. Especially for read. Once you read the data from database and you want to modify, you want to race with other actor. The longer it waits, higher chance optimistic concurrency will throw. That is why, before call
SaveChanges
it is good toRefresh
the object and verify again if there was lengthy operation between read and update.As for Local disk operation, such time wheel approach is also not good. It could easily create bottle neck, as there is timer involved. The approach RavenDB take, or the channel reader & writer take won't have timer involved. If there is no items in queue, it will execute immediately. Of course it need additional allocation before we can make it await individually, but we don't want to await individually. As long as we can ensure it is in the queue, and the queue can be persist during unexpected shutdown, that's good enough. For such scheduled operation, it can still be awaited using state approach. Similar to RavenDB's query patch operation.
Time wheel algorithm is more closed to or is nagle algorithm, which is the thing many people hate and want to be disabled. It has good throughput but high latency. That has been described in the MSFT article you have posted. Where they clearly state, only use if you don't care about latency.
Jason,
I pinged Avi, so he should see this.
Thanks Oren.
It is a bit of hard to navigate in the code through GitHub website. From just looking at time wheel class, it feels like similar to nagle algorithm which throttle any request to see if there are follow ups.
RavenDB client internally same as Microsoft Graph API. They all have RESTful API for batch commands. RavenDB version.
I have thought about it after my own post. Time wheel algorithm's benefit seems very limited. It sounds like only flip a flag, but without use
Task.WaitAll
, if you await individually, they are still result in separate request. Since if you await individually, it won't execute next line until existing has completed. Thus, each request can be await individually, is a false claim unless developer use sync to add task, then await individually.Example 1:
Example 2:
With similar RESTful API in the background, Microsoft Graph API library expose it directly. Where you have to await on the HTTP call, then get result from individual request through an Array of command. Which is hard to work with.
RavenDB client introduced many different approaches all powered by same RESTful API I believe. The
Savechanges
is only one of them. There are lazy operations, defer on patch operations, batch command throughsession.Advanced.RequestExecutor
. Even though time wheel is a unique way to achieve similar outcome with latency, but personally I prefer performance and more explicit approach. Less random. If same code execute 1,000 times, the way of executing should always be same. The number of batch request sent to database should always be same.For time wheel it is a bit random. I better give an example.
As for local disk operation, as long as each time period not limit on amount of task can be executed, all throw into an queue, then there shouldn't be much problem as bottle neck, it only means it will be delayed when there is no items in the queue.
Comment preview