Ayende @ Rahien

Oren Eini aka Ayende Rahien CEO of Hibernating Rhinos LTD, which develops RavenDB, a NoSQL Open Source Document Database.

You can reach me by:

oren@ravendb.net

+972 52-548-6969

, @ Q j

Posts: 6,879 | Comments: 49,254

filter by tags archive
time to read 4 min | 749 words

Consider a business that needs to manage leasing apartments to tenants. One of the more important aspects of the business is tracking how much money is due. Because of the highly regulated nature of leasing, there are several interesting requirements that pop up.

The current issue is how do you tackle the baseline for eviction. Let’s say that the region that the business is operating under has the following minimum requirements for eviction:

  • Total unpaid debt (30 days from invoice) that is greater than $2,000.
  • Total overdue debt (30 – 60 days from invoice) that is greater than $1,000.
  • Total overdue debt (greater than 60 days from invoice) that is greater than $500.

I’m using the leasing concept here because it is easy to understand that the date ranges themselves are dynamic. We don’t want to wait for the next first of the month to see the changes.

The idea is that we want to be able to show a grid like this:

image

The property manager can then take action based on this data. And here is the raw data that we are working on:

image

It’s easy to see that this customer still has a balance of $175. Note that this balance is as of the July 9th, because we apply payments to the oldest invoice we have. The question now becomes, how can we turn this raw data into the table above?

This turn out to be a bit hard for RavenDB, because it is optimize to answer your queries fast, which means that having to do recalculation on each query (based on the current dates) is not easy. I already shown how to do this kind of task easily enough when we are looking at a single customer. The problem is that we want to have an overall view of the system, not just on a single customer. And ideally without it costing too much.

The key observation to handle this efficiently is RavenDB is to understand that we don’t need to actually generate the table above directly. We just need to get the data to a point where it is trivial to do so. After some thinking, I came up with the following desired output:

image

There idea here is that we are going to give both overall view on the customer’s account as well as details about its outstanding debts. The important detail that we need to understand is that this customer status is unlikely to grow too big. We aren’t likely to see customers that have debts that spans many years, so the size of this document is naturally bounded. The cost of going from this output to the table above is negligible and the process of doing so is obvious. So the only question now is how do we do this?

We are going to utilize RavenDB’s multi-map/reduce to the fullest here. Let’s first look at the maps:

There isn’t really anything interesting here. We are just outputting the data that we need for the second, more interesting stage, the reduce:

There is a whole bunch of stuff going on here, but leave aside how much JavaScript scares me, let’s dig into this.

The important parameters we have here are:

  • Debt
  • CreditBalance
  • RemainingBalance

We compute the CreditBalance by summing all the outstanding payments for the customer. We then gather up all the debts for the customer and sort them by date ascending. The next stage is to apply the outstanding credits toward each of the debts, erasing them from the list if they have been completely paid off. Along the way, we compute the overall remaining balance as well.

And that is pretty much it. It is important to understand that this code is recursive. In other words, if we have a customer that has a lot of invoices and receipts, we aren’t going to be computing this in one go over everything. Instead, we’ll compute this incrementally, over subsets of the data, applying the reduce function as we go.

Queries on this index are going to be fast, and applying new invoices and receipts is going to require very little effort. You can now also do the usual things you do with indexes. For example, sorting the customers by their outstanding balance or total lifetime value.

time to read 2 min | 333 words

RavenDB is really great in aggregations. Even if you have a stupendous amount of information, it will do really well in crunching through the data and summarizing it for you. This is due to the way RavenDB implements map/reduce operations, it allows us to instantly give you aggregation results, regardless of data size. However, this approach requires that you’ll tell RavenDB up front how you want to do the aggregation. This allows RavenDB to do the work ahead of time, as you modify the data, instead of each time you query for it.

A question was raised in the mailing list. How can you use RavenDB to do arbitrary ranges during aggregation. For example, let’s say that we want to be able to look at the total charges for a customer, but slice it so we’ll have:

  • Active – 7 days back
  • Recent  – 7 – 21 days back
  • History – 21 – 60 days back

And each user can define they own time period for Active / Recent / History ranges. That complicate things somewhat, but luckily, RavenDB has multiple ways to solve this issue. First, let’s create the appropriate index for this.

This isn’t really anything special. It will simply aggregate the data by company and by date. That isn’t enough for what we want to query. For that, we need to reach for another tool in our belt, facets.

Here is the relevant query:

image

And here is what the output looks like:

image

The idea is that we have two stages for the process. First, we use the map/reduce index to pre-aggregate the data at a daily level. Then we use facets to roll up the information to the level we desire. Instead of having to go through the raw data, we can operate on the partially aggregated data and dramatically reduce the overall cost.

time to read 2 min | 243 words

The question recently came up in a discussion with a customer. They have an existing binary storage solution that they want to migrate to RavenDB. No problems, right? RavenDB has attachments support for just this reason, after all.

The key from their perspective is that their current solution gives them a file system abstraction, and they want to keep the same concept when moving the data to RavenDB. In RavenDB, we tend to think about the data as binaries attached to documents, not as raw files. But the actual solution ended up being quite elegant.

We start by creating the following “document”.

At the moment, it is pretty bare bones, but you can add additional items here, such as owner, collaborators, etc. That depends entirely on your system and needs and doesn’t impact how we will be using this. With just this class, we can now build the API. I’ll first show the code, and then discuss it.

As you can see, there really isn’t much here. The idea is that the Folder is used to store the child folders, and the files inside the folder are attachments on the folder document. We expose the typical file system operations (put, list, get file, etc) in a simple interface.

This allows you to build transactional file system on top of RavenDB and expose a natural looking file system format. More advanced usages can be to get multiple levels of the folder tree, implementing permissions, ownership, etc.

time to read 3 min | 536 words

imageIn my previous post I talked about an interesting challenge, distributing data among many different nodes, each of which can act independently on this data. Sadly, the best example for this scenario is distributing ads, but I hope you’ll excuse me on that.

The first thing to realize about this task that this is basically a synchronization problem over anything else. We define a certain location as the primary one, the one that will accept all the modifications to the data. Each of the nodes will then simply need to connect to that location every now and then and get all the updates.

“Basically a synchronization problem” is like saying that getting a P1 emergency bug at Friday night just before the movie starts is a “tad annoying”. The problem is that to do synchronization properly, you have to model your data properly, make sure that you keep track of changes and be able to send partial changes down the wire efficiently. That is not a simple task at all.

In this post, I want to offer another option to handle this. Using Raft. This is a strange use case for a consensus algorithm, I’ll admit. I guess that technically you could run a Raft consensus over 10,000 nodes. Just don’t expect it to be making any sort of decisions. So why am I offering Raft for a scenario when we have that many nodes? Because Raft, at its core, is a way to achieve consensus on a distributed log, that is all. And no one says that you must get that distributed log only via Raft directly.

The idea is basically to have a single source of truth. This can be a single server or it can be a Raft cluster with 3 – 7 nodes in it. All writes in the system are going to go there. The actual process is very well understood and there are multiple ways to do that. The simplest one to consume is likely rqlite. The log, in the case of rqlite, is going to be the SQL statements that are going to be applied to a sqlite database.

But how does this solve the problem of distributing the data? The answer is simple, we already have a way to disseminate distributed state, the log itself. What is going to happen is that you’ll have each of the nodes in the edge connect to the cluster and ask for a copy of the log as of the last committed entry that they have. When they get that, they can apply these statements to their own local copy of sqlite and know that they are now up to date with the state of the system at that time frame.

This approach skips over the need to architect your data for sync (which is hard) and push all of that complexity down the stack to your infrastructure. If the number of nodes you have is large enough, you might need to introduce mirrors to reduce the load. But that fits very nicely into the architecture without really needing to change something.

time to read 4 min | 648 words

imageBefore I start discussing this topic, I want to talk a bit about the speed of light. That pesky limit basically means that there in an inherent lag in passing information between any two points in space. On your daily life, you can mostly ignore it. The human brain is far too slow to perceive it, and even if you are working with computers, you can usually ignore the speed of light for anything less than about 500 miles.

But the speed of light is merely the hard upper limit of our ability to send information from one location to another. In practice, the lag time between any two computers connected to a network is much higher. In fact, if you are a gamer, you are very well acquainted with that fact.

Let’s assume that we have a need to do the following:

  • Hold a mutable state of some kind.
  • Modify it in a consistent manner.
  • Distribute it to many locations, where many is at least hundreds and potentially tens of thousands of locations.

Let’s break this up a bit to its component parts. A mutable state basically means that we can modify it over time, and that these modifications shows up in all locations. The speed of light (and network lag) ensures that we can’t have this happen instantaneously. And, of course, the killer requirement here is that we need to do this in a consistent manner.

I find it really hard to talk about abstract problems concretely so let’s use an example. We have a set of tax rules that determine how certain products should be taxed. The ruleset is big, changes on a regular basis and it is quite important to get things right. Oh, and we also need to do push those rules to tens of thousands of locations that would independently compute taxes for purchases.

In this case, the solution is quite easy. We have a single source of truth that all the locations can pull the tax ruleset from (directly or through mirrors). We’ll also ensure that tax rules are applied only at some future date from their creation, to allow time for all these locations to be updated. If a location hasn’t had an updated ruleset for over a certain amount of time, they can refuse to process any payments until they get an update. This example has really very little for us to do in term of design.

A better example would have multiple actors changing the data as we work with it. An actual example for when this is required can be when we have an ad provider that needs to run ads in many (physical locations). Each location is actually an IoT device that includes a computer, a screen and a camera. Each such device decides independently what ads should be shown (for example, if the camera see a baby stroller, they show ads for baby toys).

I actually had to search hard for an example that would work for this scenario but I think this is a good  one. We obviously have a lot of activity on the ad provider with many people registering and modifying ads settings. We need that portion of our business to be consistent (this is what we are charging people for, after all). However, given the probably high amount of ad devices spread all over, we can’t rely on them always being connected to a central server or even on them being connected at all.

And even if there is no connectivity and nothing new to show, we must show something. Even if we aren’t getting paid for showing the ad, not showing something or (actually worse) showing an error is something that we can’t tolerate.

Given all of these constraints, how do would you build such a system?

time to read 10 min | 1862 words

imageI spent some idle time thinking about the topic lately, and I think that this can be a pretty good blog post. The goal is to have a generic application level network protocol for client/server and server/server communication. There are a lot of them out there, and this isn’t actually design work that I intend to implement. It is a thought exercise that run through a lot of the reasoning behind the design.

There are network protocol that are specific for a purpose, and that is reflected in a lot of hidden assumptions they have. I’m trying to conceive a protocol that would be generic enough to be used for a wide variety of cases. Here are the key points in my thinking.

  • Don’t invent the wheel
  • Security is a must
  • RPC is the most common scenario
  • Don’t cause problem for the client / server by messing the protocol
  • Debuggability can’t be bolted on
  • Push model should also work

By not re-inventing the wheel I mean that this should be relatively simple to implement. That pretty much limits us to TCP as the underlying mechanism. I’m actually going to specify a stream based communication protocol, instead, though. With the advent of QUIC, HTTP/3, etc, that might actually be useful. But the whole idea is that the underlying abstraction that we want to rely on is a connection between two nodes that is a stream. All the issue of packet ordering, retries, congestions, etc are to be handled at that level.

At this day and age, security is no an optional requirement, and that should be incorporated into the design of the system from the get go. I absolutely adore TLS, and it solves a whole bunch of problems for us at the same time. It give us a secure channel, it handles authentication on both ends and is is both widely understood and commonly used. This means that selecting TLS as the security mechanism, we aren’t limiting any clients.  So the raw protocol we rely on is TLS/TCP, with authentication done using client certificates.

By far the most common usage for a network protocol is the request/reply model. You can see it in HTTP, SMTP, POP3 and most other network protocol. There is a problem with this model, though. A simply request/reply protocol is going to cause scalability and management issues for the users. What do I mean by that? Look at HTTP as a great example. It is a simple request/reply protocol, and that fact has caused a lot of complexity for users. If you want to send several requests in parallel, you need multiple connections, and head of line queue is a real problem. This impact both client and servers and can cause a great deal of hardship for all. Indeed, this is why HTTP/2 allows framing and to send multiple requests without specifying the order in which the server reply to them.

A better model would be to break that kind of dependency, and I’m likely going to be modeling at least some of that on the design of HTTP/2.

Speaking of which, HTTP/2 is a binary protocol, which is great if you have the entire internet behind you. If you are designing a network protocol that isn’t going to be natively supported by all and sundry, you are going to need to take into account the debuggability of the solution. The protocol I have in mind is a text based protocol and should be usable from the command line by using something like:

openssl s_client -connect my_server:4833

This will give you what is effectively a shell into the server, and you should be able to write commands there and get their results. I used to play around a lot with network protocols and being able to telnet to a server and manually play with the commands is an amazing experience. As a side affect of this, it also means that having a trace of the communication between client and server will be amazingly useful for diagnostics down the line. For that matter, for certain industries, being able to capture the communication trace might be an absolute requirement for auditing purposes (who did what and when).

So, here is what we have so far:

  • TLS/TCP as the underlying protocol
  • Text based so we can manually

What we are left with, though, is what is the actual data on the wire going to look like?

I’m not going to be too fancy, and I want to stick closely to stuff that I know that works. The protocol will use messages as the indivisible unit of communication. A message will have the following structure (using RavenDB as the underlying model):

GET employees/1-A employees/2-B
Timeout: 30
Sequence: 293
Include: ReportsTo

PUT “document with spaces”
Sequence: 294
Body: chunked

39
<<binary data 39 bytes in len>>
 

So, basically, we have a line oriented protocol (each line separated by \r\n, and limited to a well known maximum size). A message starts with a command line, which has the following structure:

cmd (token) args[] (token)

Where token is either a sequence of characters without whitespace or a quoted string if it contains whitespace.

Following the command line, you have the headers, which pretty much follow the design of HTTP headers. They are used to pass additional information, such as the timeout for that particular command, command specific data (like the Include header on the first command) or protocol details (like specifying a timeout for that particular command or that the second command has a body and how to read it). A command ends with an empty line, and then you have an optional body.

The headers here serve a very important role. As you can see, they are key for protocol flexibility and enabling versioning. It give us a good way to add additional data after the first deployment without breaking everything.

Because we want to support people typing this manually, we’ll probably need to have some way to specify message bodies that a human can type on their own without having to compute sizes upfront. This is something that will likely be needed only for human input, so we can probably define a terminating token that would work, not focusing on this because it isn’t a mainline feature, but I wanted to mention this because debuggability isn’t a secondary concern.

You might have noticed a repeated header in the commands I sent. The Sequence header. This one is optional (when human write it) but will be very useful for tracing, so tools will always add it. A stream connection is actually composed of two channels, the read and the write. On the read side of this protocol, we read a full command from the network, hand it off to something else to process it and go right back to reading from the network. This design is suitable for the event based systems that has proven to be so useful to scale the amount of work a server can handle. Because we can start reading the next command while we process the current ones, we greatly reduce the number of connections we require and enable a lot more parallel work.

Once a message has been processed, the reply is sent back to the client. Note that there is no requirement that the replies will be sent in the same order as the requests that initiated them. That means that an expensive operation on the server side isn’t going to block cheaper operations that came after it, which is again, important for the overall speed of the system. It also lends itself quite nicely for an event loop based processing. The sequence number for the request is used in the reply to ensure that the client can properly correlate the reply to the relevant request.

On the client side, you write a command to the network. When reading from the network, you need to keep track of the sequence number you sent and route it back to the right caller. The idea here is that on the client side, you may have a single connection that is shared among several threads, reducing the number of overall connections you need and getting better overall utilization from the network.

A nice property of this design is that you don’t have to do things this way. If you don’t want to support concurrent requests / replies, just have a single connection and wait to read the reply from the server whenever you make a request. That give you the option of simple stateful approach, but also an easy upgrade path down the line if / when you need it. The fact that the mental model of the user is request/reply is a great help, to be honest, even if this isn’t what is actually going on. This greatly reduce the amount of complexity that a user need to keep in their head.

Some details on the protocol would need gentle massaging, to ensure that a human on the command line can type reasonable commands, but that is fairly straightforward. The text based nature of the communication also lends itself nicely to tracing / audits. At the client or server levels, we can write <connection-id>.trace file that will have all the reads and writes on that connection. During debugging, you can just tail <connection-id> the right file and see exactly what is going on, or just zip them for achieve for auditing.

Speaking of zipping, let’s consider the following command:

OPTIONS gzip

This command on the connection can do things like change how we encode the data, in this case, ask the server to use gzip for all reads and writes from now on. The server can reply with a message (uncompressed) that it is now switching compression and everything from that point forward will be compressed. Note that unlike HTTP compression, we can get the benefits of compression across multiple requests, and given that most requests / reply have a lot of the same structure, likely benefit quite us by a lot.

The last topic I listed is the notion of push operations and how they should be handled. Given that we don’t have a strict request/reply model, there is an obvious way for the server to send additional data “out of band”. A client can request to be notified by the server of certain things, and the server will make a note on that and just send the data back at some later time. There is obviously the need to correlate the push notification to the original request, but that is why we have the headers for. A simple CorrelationId header on the original request and the push notification will be sufficient for the client side to be able to route that to the right callback.

I think that this is pretty much it, this should cover enough to give you a clear idea about what is required and I believe that it is enough for a thought exercise. There are a lot of other details that should probably be answered, for example, how do you deal with very large responses (break them to multiple messages, I would assume, to avoid holding up the connection for other requests), but that should be the gist of it.

FUTURE POSTS

No future posts left, oh my!

RECENT SERIES

  1. re (22):
    19 Aug 2019 - The Order of the JSON, AKA–irresponsible assumptions and blind spots
  2. Design exercise (6):
    01 Aug 2019 - Complex data aggregation with RavenDB
  3. Reviewing mimalloc (2):
    22 Jul 2019 - Part II
  4. Production postmortem (26):
    07 Jun 2019 - Printer out of paper and the RavenDB hang
  5. Reviewing Sled (3):
    23 Apr 2019 - Part III
View all series

Syndication

Main feed Feed Stats
Comments feed   Comments Feed Stats