Ayende @ Rahien

Oren Eini aka Ayende Rahien CEO of Hibernating Rhinos LTD, which develops RavenDB, a NoSQL Open Source Document Database.

You can reach me by:

oren@ravendb.net

+972 52-548-6969

, @ Q j

Posts: 6,858 | Comments: 49,164

filter by tags archive
time to read 3 min | 536 words

imageIn my previous post I talked about an interesting challenge, distributing data among many different nodes, each of which can act independently on this data. Sadly, the best example for this scenario is distributing ads, but I hope you’ll excuse me on that.

The first thing to realize about this task that this is basically a synchronization problem over anything else. We define a certain location as the primary one, the one that will accept all the modifications to the data. Each of the nodes will then simply need to connect to that location every now and then and get all the updates.

“Basically a synchronization problem” is like saying that getting a P1 emergency bug at Friday night just before the movie starts is a “tad annoying”. The problem is that to do synchronization properly, you have to model your data properly, make sure that you keep track of changes and be able to send partial changes down the wire efficiently. That is not a simple task at all.

In this post, I want to offer another option to handle this. Using Raft. This is a strange use case for a consensus algorithm, I’ll admit. I guess that technically you could run a Raft consensus over 10,000 nodes. Just don’t expect it to be making any sort of decisions. So why am I offering Raft for a scenario when we have that many nodes? Because Raft, at its core, is a way to achieve consensus on a distributed log, that is all. And no one says that you must get that distributed log only via Raft directly.

The idea is basically to have a single source of truth. This can be a single server or it can be a Raft cluster with 3 – 7 nodes in it. All writes in the system are going to go there. The actual process is very well understood and there are multiple ways to do that. The simplest one to consume is likely rqlite. The log, in the case of rqlite, is going to be the SQL statements that are going to be applied to a sqlite database.

But how does this solve the problem of distributing the data? The answer is simple, we already have a way to disseminate distributed state, the log itself. What is going to happen is that you’ll have each of the nodes in the edge connect to the cluster and ask for a copy of the log as of the last committed entry that they have. When they get that, they can apply these statements to their own local copy of sqlite and know that they are now up to date with the state of the system at that time frame.

This approach skips over the need to architect your data for sync (which is hard) and push all of that complexity down the stack to your infrastructure. If the number of nodes you have is large enough, you might need to introduce mirrors to reduce the load. But that fits very nicely into the architecture without really needing to change something.

time to read 4 min | 648 words

imageBefore I start discussing this topic, I want to talk a bit about the speed of light. That pesky limit basically means that there in an inherent lag in passing information between any two points in space. On your daily life, you can mostly ignore it. The human brain is far too slow to perceive it, and even if you are working with computers, you can usually ignore the speed of light for anything less than about 500 miles.

But the speed of light is merely the hard upper limit of our ability to send information from one location to another. In practice, the lag time between any two computers connected to a network is much higher. In fact, if you are a gamer, you are very well acquainted with that fact.

Let’s assume that we have a need to do the following:

  • Hold a mutable state of some kind.
  • Modify it in a consistent manner.
  • Distribute it to many locations, where many is at least hundreds and potentially tens of thousands of locations.

Let’s break this up a bit to its component parts. A mutable state basically means that we can modify it over time, and that these modifications shows up in all locations. The speed of light (and network lag) ensures that we can’t have this happen instantaneously. And, of course, the killer requirement here is that we need to do this in a consistent manner.

I find it really hard to talk about abstract problems concretely so let’s use an example. We have a set of tax rules that determine how certain products should be taxed. The ruleset is big, changes on a regular basis and it is quite important to get things right. Oh, and we also need to do push those rules to tens of thousands of locations that would independently compute taxes for purchases.

In this case, the solution is quite easy. We have a single source of truth that all the locations can pull the tax ruleset from (directly or through mirrors). We’ll also ensure that tax rules are applied only at some future date from their creation, to allow time for all these locations to be updated. If a location hasn’t had an updated ruleset for over a certain amount of time, they can refuse to process any payments until they get an update. This example has really very little for us to do in term of design.

A better example would have multiple actors changing the data as we work with it. An actual example for when this is required can be when we have an ad provider that needs to run ads in many (physical locations). Each location is actually an IoT device that includes a computer, a screen and a camera. Each such device decides independently what ads should be shown (for example, if the camera see a baby stroller, they show ads for baby toys).

I actually had to search hard for an example that would work for this scenario but I think this is a good  one. We obviously have a lot of activity on the ad provider with many people registering and modifying ads settings. We need that portion of our business to be consistent (this is what we are charging people for, after all). However, given the probably high amount of ad devices spread all over, we can’t rely on them always being connected to a central server or even on them being connected at all.

And even if there is no connectivity and nothing new to show, we must show something. Even if we aren’t getting paid for showing the ad, not showing something or (actually worse) showing an error is something that we can’t tolerate.

Given all of these constraints, how do would you build such a system?

time to read 10 min | 1862 words

imageI spent some idle time thinking about the topic lately, and I think that this can be a pretty good blog post. The goal is to have a generic application level network protocol for client/server and server/server communication. There are a lot of them out there, and this isn’t actually design work that I intend to implement. It is a thought exercise that run through a lot of the reasoning behind the design.

There are network protocol that are specific for a purpose, and that is reflected in a lot of hidden assumptions they have. I’m trying to conceive a protocol that would be generic enough to be used for a wide variety of cases. Here are the key points in my thinking.

  • Don’t invent the wheel
  • Security is a must
  • RPC is the most common scenario
  • Don’t cause problem for the client / server by messing the protocol
  • Debuggability can’t be bolted on
  • Push model should also work

By not re-inventing the wheel I mean that this should be relatively simple to implement. That pretty much limits us to TCP as the underlying mechanism. I’m actually going to specify a stream based communication protocol, instead, though. With the advent of QUIC, HTTP/3, etc, that might actually be useful. But the whole idea is that the underlying abstraction that we want to rely on is a connection between two nodes that is a stream. All the issue of packet ordering, retries, congestions, etc are to be handled at that level.

At this day and age, security is no an optional requirement, and that should be incorporated into the design of the system from the get go. I absolutely adore TLS, and it solves a whole bunch of problems for us at the same time. It give us a secure channel, it handles authentication on both ends and is is both widely understood and commonly used. This means that selecting TLS as the security mechanism, we aren’t limiting any clients.  So the raw protocol we rely on is TLS/TCP, with authentication done using client certificates.

By far the most common usage for a network protocol is the request/reply model. You can see it in HTTP, SMTP, POP3 and most other network protocol. There is a problem with this model, though. A simply request/reply protocol is going to cause scalability and management issues for the users. What do I mean by that? Look at HTTP as a great example. It is a simple request/reply protocol, and that fact has caused a lot of complexity for users. If you want to send several requests in parallel, you need multiple connections, and head of line queue is a real problem. This impact both client and servers and can cause a great deal of hardship for all. Indeed, this is why HTTP/2 allows framing and to send multiple requests without specifying the order in which the server reply to them.

A better model would be to break that kind of dependency, and I’m likely going to be modeling at least some of that on the design of HTTP/2.

Speaking of which, HTTP/2 is a binary protocol, which is great if you have the entire internet behind you. If you are designing a network protocol that isn’t going to be natively supported by all and sundry, you are going to need to take into account the debuggability of the solution. The protocol I have in mind is a text based protocol and should be usable from the command line by using something like:

openssl s_client -connect my_server:4833

This will give you what is effectively a shell into the server, and you should be able to write commands there and get their results. I used to play around a lot with network protocols and being able to telnet to a server and manually play with the commands is an amazing experience. As a side affect of this, it also means that having a trace of the communication between client and server will be amazingly useful for diagnostics down the line. For that matter, for certain industries, being able to capture the communication trace might be an absolute requirement for auditing purposes (who did what and when).

So, here is what we have so far:

  • TLS/TCP as the underlying protocol
  • Text based so we can manually

What we are left with, though, is what is the actual data on the wire going to look like?

I’m not going to be too fancy, and I want to stick closely to stuff that I know that works. The protocol will use messages as the indivisible unit of communication. A message will have the following structure (using RavenDB as the underlying model):

GET employees/1-A employees/2-B
Timeout: 30
Sequence: 293
Include: ReportsTo

PUT “document with spaces”
Sequence: 294
Body: chunked

39
<<binary data 39 bytes in len>>
 

So, basically, we have a line oriented protocol (each line separated by \r\n, and limited to a well known maximum size). A message starts with a command line, which has the following structure:

cmd (token) args[] (token)

Where token is either a sequence of characters without whitespace or a quoted string if it contains whitespace.

Following the command line, you have the headers, which pretty much follow the design of HTTP headers. They are used to pass additional information, such as the timeout for that particular command, command specific data (like the Include header on the first command) or protocol details (like specifying a timeout for that particular command or that the second command has a body and how to read it). A command ends with an empty line, and then you have an optional body.

The headers here serve a very important role. As you can see, they are key for protocol flexibility and enabling versioning. It give us a good way to add additional data after the first deployment without breaking everything.

Because we want to support people typing this manually, we’ll probably need to have some way to specify message bodies that a human can type on their own without having to compute sizes upfront. This is something that will likely be needed only for human input, so we can probably define a terminating token that would work, not focusing on this because it isn’t a mainline feature, but I wanted to mention this because debuggability isn’t a secondary concern.

You might have noticed a repeated header in the commands I sent. The Sequence header. This one is optional (when human write it) but will be very useful for tracing, so tools will always add it. A stream connection is actually composed of two channels, the read and the write. On the read side of this protocol, we read a full command from the network, hand it off to something else to process it and go right back to reading from the network. This design is suitable for the event based systems that has proven to be so useful to scale the amount of work a server can handle. Because we can start reading the next command while we process the current ones, we greatly reduce the number of connections we require and enable a lot more parallel work.

Once a message has been processed, the reply is sent back to the client. Note that there is no requirement that the replies will be sent in the same order as the requests that initiated them. That means that an expensive operation on the server side isn’t going to block cheaper operations that came after it, which is again, important for the overall speed of the system. It also lends itself quite nicely for an event loop based processing. The sequence number for the request is used in the reply to ensure that the client can properly correlate the reply to the relevant request.

On the client side, you write a command to the network. When reading from the network, you need to keep track of the sequence number you sent and route it back to the right caller. The idea here is that on the client side, you may have a single connection that is shared among several threads, reducing the number of overall connections you need and getting better overall utilization from the network.

A nice property of this design is that you don’t have to do things this way. If you don’t want to support concurrent requests / replies, just have a single connection and wait to read the reply from the server whenever you make a request. That give you the option of simple stateful approach, but also an easy upgrade path down the line if / when you need it. The fact that the mental model of the user is request/reply is a great help, to be honest, even if this isn’t what is actually going on. This greatly reduce the amount of complexity that a user need to keep in their head.

Some details on the protocol would need gentle massaging, to ensure that a human on the command line can type reasonable commands, but that is fairly straightforward. The text based nature of the communication also lends itself nicely to tracing / audits. At the client or server levels, we can write <connection-id>.trace file that will have all the reads and writes on that connection. During debugging, you can just tail <connection-id> the right file and see exactly what is going on, or just zip them for achieve for auditing.

Speaking of zipping, let’s consider the following command:

OPTIONS gzip

This command on the connection can do things like change how we encode the data, in this case, ask the server to use gzip for all reads and writes from now on. The server can reply with a message (uncompressed) that it is now switching compression and everything from that point forward will be compressed. Note that unlike HTTP compression, we can get the benefits of compression across multiple requests, and given that most requests / reply have a lot of the same structure, likely benefit quite us by a lot.

The last topic I listed is the notion of push operations and how they should be handled. Given that we don’t have a strict request/reply model, there is an obvious way for the server to send additional data “out of band”. A client can request to be notified by the server of certain things, and the server will make a note on that and just send the data back at some later time. There is obviously the need to correlate the push notification to the original request, but that is why we have the headers for. A simple CorrelationId header on the original request and the push notification will be sufficient for the client side to be able to route that to the right callback.

I think that this is pretty much it, this should cover enough to give you a clear idea about what is required and I believe that it is enough for a thought exercise. There are a lot of other details that should probably be answered, for example, how do you deal with very large responses (break them to multiple messages, I would assume, to avoid holding up the connection for other requests), but that should be the gist of it.

FUTURE POSTS

No future posts left, oh my!

RECENT SERIES

  1. Production postmortem (26):
    07 Jun 2019 - Printer out of paper and the RavenDB hang
  2. Reviewing Sled (3):
    23 Apr 2019 - Part III
  3. RavenDB 4.2 Features (5):
    21 Mar 2019 - Diffing revisions
  4. Workflow design (4):
    06 Mar 2019 - Making the business people happy
  5. Data modeling with indexes (6):
    22 Feb 2019 - Event sourcing–Part III–time sensitive data
View all series

RECENT COMMENTS

Syndication

Main feed Feed Stats
Comments feed   Comments Feed Stats