Oren Eini

CEO of RavenDB

a NoSQL Open Source Document Database

Get in touch with me:

oren@ravendb.net +972 52-548-6969

Posts: 7,546
|
Comments: 51,161
Privacy Policy · Terms
filter by tags archive
time to read 4 min | 639 words

imageOne of the benefits of having a product in the market for a decade is that you gain some experience in how people are using it. This lead to interesting design decisions over time. Some of them are obvious. Such as the setup process for RavenDB. Some aren’t, such as the surface of the session. It is kept small and focused on CRUD operations to make it easy to understand and use in the common cases.

And sometimes, the design is in the fact that the code isn’t there at all. Case in point, the notion of connection strings in RavenDB 4.0. This feature was removed in its entirety in this release and users are expected to provide the connection parameters to the document store on their own. How they do that is not something that we concern ourselves with. A large part of the reasoning behind this decision was around our use of X509 certificates for authentication. In many environments there are strict rules about the usage and deployment of certificates and having a connection string facility would force us to always chase the latest ones. For that matter, where you store the connection string is also a problem. We have seen configuration stored in app.config, environment variables, json configuration, DI configuration and more. And each time we were expected to support this new method of getting the connection string.  By not having any such mechanism, we are able to circumvent the problem entirely.

This sounds like a copout, but it isn’t. Consider this thread in the RavenDB mailing list. It talks about how to setup RavenDB 4.0 in Azure in a secure manner. Just reading the title of the thread made me cringe, thinking that this is going to be a question that would take a long time to answer (setup time, mostly). But that isn’t it at all. instead, this is a walk through showing you how you can setup things properly in an environment where you cannot load a certificate from a file and need to do that directly from the Azure certificate store.

This is quite important, since this is one of the things that I keep having to explain to team members. We want to be a very clear demarcation about the kind of things that we support and the kinds we don’t. Mostly because I’m not willing to do half ass job in supporting things. So saying something like: Oh, we’ll just support a file path and we’ll let the user do the rest for more complex stuff is not going to fly with this design philosophy.

If we do something, a user reasonably expects us to do a complete job in doing that and puts the entire onus of responsibility on us. On the other hand, if you don’t do something, there is usually no expectation that you’ll handle that. There is also the issue that is many cases, solving the general problem is nearly impossible while solving a particular user scenario is trivial. So letting them have full responsibility works much better. At a minimum, they don’t need to circumvent the things we do for the stuff that we do support, but can start from a clear ground.

Coming back to the certificate example, if we would have a Certificate property and a CertificatePath property, allowing for each setup for a common scenario, then it is easy down the line to just assume that the CertificatePath is set if we have a certificate, and suddenly a user that doesn’t use a certificate from a file is going to need to be aware of this and handle the issue. If there is no such property, the behavior is always going to be correct.

time to read 5 min | 808 words

In the process of working on RavenDB 4.0, we are going over our code and looking for flaws. Both in the actual implementation and in the design of the API. The idea is to clear away the things that we know are bad in practice. And that leads us to today’s topic. Subscriptions.

Subscriptions are a way to ask RavenDB to give us, in a reliable way, all documents that match a certain criteria. Here is what the code looks like in RavenDB 3.5:

You typically write this in some form of background processing / job style. The key aspect here is that RavenDB is responsible for sending the data to the subscription, and making sure that you will not lose any updates. So the code is resilient for client errors, for connection errors, database restarts, etc.

However, the topic today isn’t actually subscriptions, it is their API. In practice, we noticed several deficiencies in the API above. To start with, the subscription actually started in an async manner, so opening the subscription and subscribing to it might actually be racy (it is possible to subscribe and start getting documents from the server before you attach your delegate to handle those documents). We had to write a bit of code to handle that scenario, and it was complex in the presence of adding / removing subscriptions when the subscription was live.

The other problem was that subscriptions are reliable. This means that if there is an error, the subscription will handle it. A disconnect from the server will automatically reconnect, and as far as the caller is concerned, nothing has really changed. It isn’t aware of any network errors or recovery that the subscription is doing.

Which is all well and good, until you realize that your connection string is wrong, and the subscription is going to forever retry to connect to the wrong location. We have a way to report errors to the caller code, but in most cases, the caller doesn’t care, the subscription is going to handle it anyway. So we have a problem. How do we balance both the need to handle errors and recover internally and let the caller know about our state?

We currently have the OnError() method we’ll call on the caller to let it know about any errors that we have, but that is not really helping it. The caller doesn’t have a good way to know if we can recover or not, and asking the caller to implement an error recovery policy is something that we don’t really want to do. This is complex and hard to do, and not something that the client should do.

What we ended up with is the following. First, we changed the API so you can’t just add / remove observers to a subscription. You create a new subscription, you attach all the observers that you want, and then you explicitly demarcate the stage in which the subscription starts. Having such an explicit stage frees us from the concurrency and race conditions that we previously had to handle. But it also gives us a chance to actually report something explicit to the calling code.

Let’s look at code, then discuss it:

The idea is that if you wait on the task returned from the StartAsync, which is very natural to do, you can tell whether the first time connection was successful or not. Then you can make determination on what to do next. If there is an error, you can dispose the subscription and report it upward, or you can let it recover automatically. This still doesn’t cover some scenarios, unfortunately. We also report all errors to the subscribers, which can make their own determination there.

The problem is that there are some errors that we just can’t recover from. If a user’s password has changed, eventually we’ll recycle the connection, and get an unauthorized error. There is nothing that the subscription can do to fix that, but at the same time, there is a very little chance for it to know that (a bad password is easy, a firewall rule blocking access is very hard to distinguish from the db being down, and in either case there is nothing that the subscription can do).

We are thinking about adding some sort of limit, something like if we retried for certain number of times, or for a certain duration, and couldn’t recover, we’ll report it through an event / method that must be registered (otherwise we’ll just throw it upward and maybe crash the process). The idea is that in this case, we need someone to pay attention, and an unhandled exception (if you didn’t register to catch it) would be appropriate.

I would like to get some feedback on the idea, before going ahead and implementing it.

time to read 3 min | 452 words

In my previous post, I explained an API design that give the user the option to perform an immediate operation, use the default fire and forget or use an explicit bulk mechanism. The idea is that most operations are small, and that the cost of actually going over the network is going to dominate the cost of the entire operation. In this case, we want to give the user the option of selecting the “I want to know the operation completed” or “I just want to try the best, I’m fine if there is a failure” modes.

Eli asks:

Trying to understand why this isn't just a scripting case. In your example you loop over CSV and propose an increment call per part which gets batched up and committed outside the user's control. Why not define a JavaScript on the server that takes an array ("a batch") of parts sent over the wire and iterates over it on the server instead? This way the user gets fine grained control over the batch. I'm guessing the answer has something to do with your distributed counter architecture...

If I understand Eli properly, the idea is that we’ll just expose an API like this:

Increment(“users/1/visits”, 1);

And provide an endpoint where a user can POST a JavaScript code that will call this. The idea is that the user will be able to decide whatever to call this once, or send a whole batch of updates in one go.

This is certainly an option, but in my considered opinion, it is a pretty bad one. It has nothing to do with the distributed architecture, it has to do with the burden we put on the user. The actual semantics of “go all the way to the server and confirm the operation” vs “let us do a bulk insert kind of thing” are pretty easy. Each of them has a pretty well defined behavior.

But what happens when you want to do an operation per page view? From the point of view of your code, you are making a single operation (incrementing the counters for a particular entity). From the point of view of the system as a whole, you are generating a whole lot of individual requests that would be much better off as a single bulk request.

Having a scripting endpoint gives the user the option of doing that, sure, but then they need to handle:

  • Error recovery
  • Multi threading
  • Flushing on batch size / time
  • Failover

And many more that I’m probably forgetting. By providing the users with the option of making an informed choice about speed vs. safety, we avoid putting the onus of the actual implementation on them.

time to read 3 min | 552 words

In RavenDB 4.0 (yes, that is quite a bit away), we are working on additional data types and storage engines. One of the things that we’ll add, for example, is the notion of gossiping distributed counters. That doesn’t actually matter for our purposes here, however.

What I wanted to talk about today is the problem in making small updates over a network. Consider the counters example. By far the most common operations are some variant of:

counters.Increment(name, 1);

Since the database is remote, we need to send that over the network. But that is a very small operation. Going all the way to the remote database just for that is a big waste of time. Consider that in most systems, we don’t have just a single thread of execution, we have many. Each of them performing their own operations. Allowing each operation to go on its own is a big waste, but what other options can we offer?

There are other considerations as well. While most of the time we’ll be making small operations, it is very common to need to do bulk operations as well. Maybe you are setting up the system for the first time, or importing data from a file, etc. Making large number of individual requests will kill any hope of doing this fast. The obvious answer is to use batching. send a lot of values to the server all at once. This reduce the network overhead significantly.

The API for that would be something like:

using(var batch = counters.Advanced.NewBatch())
{
	foreach(var line in File.EnumerateAllLines("counters.csv"))
	{
		var parts = line.Split();
		batch.Increment(parts[0], long.Parse(line[1]));
	}
}

So that is great for big things, but not very helpful for individual operations. We still have to go to the server for each of those.

Of course, we could also use the batch API, but making use of that for an individual operation is… a big waste.

What to do?

The end result we arrived at was that we have three levels of API:

  • counters.Increment(name, 1); – single operation, executed immediately over the network. Guaranteed to succeed or fail and give you the result.
  • counters.Advanced.NewBatch() – batch operation, executed over all of the items in the batch (but not as a single transaction), will let you know if the whole operation succeeded, or if there was an issue with something.
  • counters.Batch.Increment() – the default batch, thread safe, can be utilized by individual requests. This is a fire & forget operation. We increment, and behind the scene we’ll merge all the increment from all the threads and send them in batches to the server.

Note that the last option means that we’ll only do batches, so only when enough time has lapsed or we have enough items to send will we send the data to the server. The idea is that you get to choose, based on the importance of what you are doing.

If you need confirmation that something was successful, use the individual operation. If you just want us to make best effort, and if something bad really happened you don’t care about it, use the batch option.

Note that I’m using the counters example here because it is simple, but the same applies for things like time series, and other stuff that we are currently building.

time to read 2 min | 225 words

This post conclude this week’ series of API design choices regarding how to handle partial failure scenarios in sharded cluster. In my previous post,  I discussed my issues with a local solution for the problem.

The way we ended up solving this issue is actually quite simple. We apply a global solution to a global problem, we added the ability to inject error handling logic deep into the execution pipeline of the sharding implementation, like this:

image

In this case, as you can see, we are allow requests to fail if we are querying (because we can probably still get something from other servers that will be useful), but if you are requesting something by id and it generates an error, we will propagate this error. Note that in our implementation, we call to a user defined “NotifyUserInterfaceAboutServerFailure”, which will let the user know about the error.

That way, you probably have some warning in the UI about partial information, but you are still functional. This is the proper way to handle this, because you are handling this once, and it means that you can handle it properly, instead of having to do the right thing everywhere.

time to read 4 min | 634 words

Still going on with the discussion on how to handle failures in a sharded cluster, we are back to the question of how to handle the scenario of one node in a cluster going down. The question is, what should be the system behavior in such a scenario.

In my previous post, I discussed one alternative option:

ShardingStatus status;
va recentPosts = session.Query<Post>()
          .ShardingStatus( out status )
          .OrderByDescending(x=>x.PublishedAt)
          .Take(20)
          .ToList();

I said that I really don’t like this option. But deferred the discussion on exactly why.

Basically, the entire problem boils down to a very simple fact, manual memory management doesn’t work.

Huh? What is the relation between handling failures in a cluster to manual memory management? Oren, did you get your wires crossed again and continued a different blog post altogether?

Well, no. It is the same basic principle. Requiring users to add a specific handler for this result in several scenarios, none of them ideal.

First, what happen if we don’t specify this? We are back to the “ignore & swallow the error” or “throw and kill the entire system”.

Let us assume that we go with the first option, the developer has a way to get the error if they want it, but if they don’t care, we will just ignore this error. The problem with this approach is that it is entirely certain that developers will not add this, at least, not before the first time we have a node fail in production and the system will simply ignore this and show the wrong results.

The other option, throw an exception if the user didn’t ask for the sharding status and we have a failing node, is arguably worse. We now have a ticking time bomb. If a node goes down, the entire system will go down. The reason that I say that this is worse than the previous option is that the natural inclination of most developers is to simply stick the ShardingStatus() there and “fix” the problem. Of course, this is basically the same as the first option, but this time, the API actually let the user down the wrong path.

Second, this is forcing a local solution on a global problem. We are trying to force the user to handle errors at a place where the only thing that they care about is the actual business problem.

Third, this alternative doesn’t handle scenarios where we are doing other things, like loading by id. How would you get the ShardingStatus from this call?

session.Load<Post>("tri/posts/1");

Anything that you come up with is likely to introduce additional complexity and make things much harder to work with.

As I said, I intensely dislike this option. A much better alternative exists, and I’ll discuss this in the next post…

time to read 2 min | 276 words

Still going on with the discussion on how to handle failures in a sharded cluster, we are back to the question of how to handle the scenario of one node in a cluster going down. The question is, what should be the system behavior in such a scenario.

In the previous post, we discussed the option of simply ignoring the failure, and the option of simply failing entirely. Both options are unpalatable, because we either transparently hide some data from the user (which reside on the failing node) or we take the entire system down when a single node is down.

Another option that was suggested in the mailing list is to actually expose this to the user, like so:

ShardingStatus status;
va recentPosts = session.Query<Post>()
          .ShardingStatus( out status )
          .OrderByDescending(x=>x.PublishedAt)
          .Take(20)
          .ToList();

This will give us the status information about potentially failing shards.

I intensely dislike this option, and I’ll discuss the reasons why on the next post. In the meantime, I would like to hear your opinion about this API choice.

time to read 2 min | 246 words

In my previous post, I discussed how to handle partial failure in sharded cluster scenario. This is particularly hard because this is the case where one node out of a 3 nodes (or more) cluster is failing, and it is entirely likely that we can give the user at least partial service properly.

The most obvious, and probably easiest option, is to simply catch and log the error for the failing server and not notify the calling application code about this. The nice thing about this option is that if you have a failing server, you don’t have your entire system goes down, and can handle this at least partially.

The sad part about this option is that there really is a good chance that you won’t notice that some part of the system is down, and that you are actually returning only partial results. That can lead to some really nasty scenarios, such as the case where we “lose” an order, or a payment, and we don’t show this to the user.

That can lead to some really frustrating scenarios where a customer is saying “but I see the payment right here” and the help desk says “I am sorry, but I don’t see it, therefor you don’t get any service, have a good day, bye”.

Still… that is probably the best case scenario considering the alternative being the entire system being unavailable if any single node is down.

Or is it… ?

time to read 1 min | 187 words

An interesting question came up recently. How do we want to handle sharding failures?

For example, let us say that I have a 3 nodes clusters of RavenDB, serving posts for a blog (just to give some random example). The way the sharding has been setup, we are doing sharding using Round Robin based on posts (so each post goes to a different machine, and anything related to post goes to the same node as the post). Here is how it can be set:

image

Now, we want to display the main page, and we would like to show the most recent posts. We can do this using the following code:

image

The question is, what would happen if the second server if offline?

I’ll give several alternative in the next few posts.

FUTURE POSTS

  1. Partial writes, IO_Uring and safety - about one day from now
  2. Configuration values & Escape hatches - 5 days from now
  3. What happens when a sparse file allocation fails? - 7 days from now
  4. NTFS has an emergency stash of disk space - 9 days from now
  5. Challenge: Giving file system developer ulcer - 12 days from now

And 4 more posts are pending...

There are posts all the way to Feb 17, 2025

RECENT SERIES

  1. Challenge (77):
    20 Jan 2025 - What does this code do?
  2. Answer (13):
    22 Jan 2025 - What does this code do?
  3. Production post-mortem (2):
    17 Jan 2025 - Inspecting ourselves to death
  4. Performance discovery (2):
    10 Jan 2025 - IOPS vs. IOPS
View all series

Syndication

Main feed Feed Stats
Comments feed   Comments Feed Stats
}