Ayende @ Rahien

Oren Eini aka Ayende Rahien CEO of Hibernating Rhinos LTD, which develops RavenDB, a NoSQL Open Source Document Database.

You can reach me by:

oren@ravendb.net

+972 52-548-6969

, @ Q j

Posts: 6,844 | Comments: 49,144

filter by tags archive
time to read 2 min | 295 words

Being able to handle replication at the storage level is a really nice feature to have. More than that, it is a feature that can be broadly applied. But… a database is a lot more than just storage. And being able to just move the data around between machines is nice, but there are other things we have to take into account.

In particular, when we replicate via storage changes, we don’t have a good way to take actions on changes. Most of the time, that means that we can’t actually rely on internal caches, and would probably have to deal with that somehow in another fashion. But there are usually secondary processing that is done on a node that would have to be accounted for.

For example, let us assume that we had the ability to replicate RavenDB (docs) changes between machines using storage replication. The problem here is that we would be replicating the documents, but not the indexes, and when we do that, we would need to index the changed documents on the destination node. However, that would actually require two data stores, one for the actual documents data, and one for all of the non replicated data (indexing, stats, etc).

In other words, I think that such a database would have to be designed specifically for that scenario.

In addition to that, it would probably be best for the storage replication to also be annotated with information for higher level code. So if you modify this range in the file, you’ll also know that you need to drop the following documents from the cache.

time to read 2 min | 317 words

So, after reaching the conclusion that replication is going to be hard, I went back to the office and discussed those challenges and was in general pretty annoyed by it. Then Michael made a really interesting suggestion. Why not put it on RAFT?

And once he explained what he meant, I really couldn’t hold my excitement. We now have a major feature for 4.0. But before I get excited about that (we’ll only be able to actually start working on that in a few months, anyway), let us talk about what the actual suggestion was.

Raft is a consensus algorithm. It allows a distributed set of computers to arrive into a mutually agreed upon set of sequential log records. Hm… I wonder where else we can find sequential log records, and yes, I am looking at you Voron.Journal.

The basic idea is that we can take the concept of log shipping, but instead of having a single master/slave relationship, we change things so we can put Raft in the middle. When committing a transaction, we’ll hold off committing the transaction until we have a Raft consensus that it should be committed. The advantage here is that we won’t be constrained any longer by the master/slave issue. If there is a server down, we can still process requests (maybe need to elect a new cluster leader, but that is about it).

That means that from an architectural standpoint, we’ll have the ability to process write requests for any quorum (N/2+1). That is a pretty standard requirement for distributed databases, so that is perfectly fine.

That is a pretty awesome thing to have, to be honest, and more importantly, this is happening at the low level storage layer. That means that we can apply this behavior not just to a single database solution, but to many database solutions.

I’m pretty excited about this.

time to read 4 min | 706 words

Armed with the knowledge about replication strategies from my previous post, we can now consider this in the context of the time series database.

We actually have two distinct pieces of data that we track. We have the actual time data, the timestamp and value that we keep track of, and we have the series information (tags, mostly).  We can consider using log shipping here, and that would give us a great way to get a read replica. But the question is why we would want to do that. It is nice to get a replica, but that replica would have to be read only. Is that useful?  It could take over, if the master is down, but that would mean that the master would have to stay down (or converted to a slave). And divergent writes are a problem.

While attractive as a cheap way to handle replication, I don’t like this very much.

So that leaves us with using a multi write partners situation. In this case, we can allow the two servers to operate in tandem. We need to have some way to resolve conflicts, and this is where things gets a bit messy.

For series data, it is trivial to just use some form of last write wins. This assumes a synchronized clock between the servers, but we’ll leave that requirement for now.

The problem is with the actual time data. Conceptually, we intend to store the information like this:

image

The problem is how do you detect conflicts. And are they really even possible. Let us assume that we want to update a particular value at time T on both servers. Server A replicates to server B, and now we need to decide how to deal with it. Ignore the value? Overwrite the value?

The important thing is that we need some predictable way to handle this that will end up with all the nodes in the cluster having the same agreed upon value. The simplest scenario, assuming a clock sync, is to use the write timestamp. But that would require us to keep the write time stamp. Currently we can use just 16 bytes for each time record. But recording the write timestamp will increase our usage to 24 bytes. That is a 50% increase just to handle conflicts. I don’t want to pay that.

The good thing about time series data is that a single value isn’t that important, and the likelihood that they will be changed it relatively small. We can just decide to say: We’ll choose a value, for example, we will choose the maximum value for that time, and be done with it. That has its own set of problems, but we’ll deal with that in a bit. We need to discuss how we deal with replication in general, first.

Let us imagine that we have 3 servers:

  • Server A replicates to B
  • Server B replicates to C
  • Server C replicates to A

We have concurrent writes to the same time value on both server A and B. For the purpose of the discussion, let us assume that we have a way to resolve the conflict.

Server A notifies Server B about the change, but server B already have a different value for that. Conflict resolution is run, and we have a new value .That value need to be replicated down stream. It goes to Server C, who then replicate it to Server A, who then replicates it to Server B? Ad infinitum?

I intentionally chose this example, but the same thing can happen with just two servers replicating to one another (master/master). And the problem here is that in order to be able to actually track this properly, we are going to need to keep a lot of metadata around, per value. While I can sort of accept the need to keep the write time (thus requiring 50% more space per value), the idea of holding many times more metadata for replication purposes than the actual data we want to replicate seems… silly at best.

Log shipping replication it is, at least until someone can come up with a better way to resolve the issues above.

time to read 3 min | 447 words

What happens when you want to do an aggregation query over very large data set? Let us say that you have 1 million data points within the range you want to query, and you want to get a rollup of all the data in the range of a week.

As it turns out, there is a relatively simple solution for optimizing this and maintaining a relatively constant query speed, regardless of the actual data set size.

Time series data has the great benefit of being easily aggregated. Most often, the data looks like this:

image

The catch is that you have a lot of it.

The set of aggregation that you can do over the data is also relatively simple. You have mean, max, min, std deviation, etc.

The time ranges are also pretty fixed, and the nice thing about time series data is that the bigger the range you want to go over, the bigger your rollup window is. In other words, if you want to look at things over a week, you would probably use a day or hour rollup. If you want to see things over a month, you will use a week or a day, over a year, you’ll use a week or a month, etc.

Let us assume that the cost of aggregation is 10,000 operations per second (just some number I pulled because it is round and nice to work with, real number is likely several orders of magnitude higher). So if we have to run this over a set that is 1 million data points in size, with the data being entered on per minute basis. With 1 million data points, we are going to wait almost two minutes for the reply. But there is really no need to actually check all those data points manually.

What we can do is actually prepare, ahead of time, the rollups on an hourly basis. That gives us a summary on a per hour basis of just over 16 thousand data points, and will result in a query that runs in under 2 seconds. If we also do a daily rollup, we move from having a million data points to less than a thousand.

Actually maintaining those computed rollups would probably be a bit complex, but it won’t be any more complex than how we are computing map/reduce indexes in RavenDB (this is a private, and simplified, case of map/reduce). And the result would be instant query times, even on very large data sets.

time to read 3 min | 513 words

I expect (at a bare minimum), to be able to do about 25,000 sustained writes per second on a single machine. I actually expect to be able to do significantly more. But let us go with that amount for now as something that if it drops below that value, we are in trouble. Over a day, that is 2.16 billion records. I don’t think this likely, but that is a lot of data to work with.

That leads to interesting questions on scale out story. Do we need one? Well, probably not for performance reasons (said the person who haven’t written the code, much less benchmarked it) at least not if my hope for the actual system performance comes about.

However, just writing the data isn’t very interesting, we also need to read and work with it. One of the really nice thing about time series data is that it is very easily sharded, because pretty much all the operations you have are naturally accumulative. You have too much data for a single server, go ahead and split it. On query time, you can easily merge it all back up again and be done with it.

However, a much more interesting scenario for us is the high availability / replication story. This is where things gets… interesting. With RavenDB, we do async server side replication. With time series data, we can do that (and we have the big advantage of not having to worry about conflicts), but the question is how.

The underlying mechanism for all the replication in RavenDB is the notion of etags, of a way to track the order of changes to the database. In time series data, that means tracking the changes that happens in some form of a sane fashion.

It means having to track, per write, at least another 16 bytes. (8 bytes for a monotonically increasing etag number, another 8 for the time that was changed). And we haven’t yet spoken of things like replication of deletes, replication of series tag changes, etc. I can tell you that dealing with replication of deletes in RavenDB, for example, was no picnic.

A much simpler alternative would be to not handle this at the data level. One nice thing about Voron is that we can actually do log shipping. That would trivially solve pretty much the entire problem set of replication, because it would happen at the storage layer, and take care of all of it.

However, it does come with its own set of issues. In particular, it means that the secondary server has to be read only, it cannot accept any writes. And by that I mean that it would physically reject them. That leads to a great scale out story with read replicas, but it means that you have to carefully consider how you are dealing the case of the primary server going down for a duration, or any master/master story.

I am not sure that I have any good ideas about this at this point in time. I would love to hear suggestions.

time to read 1 min | 143 words

We need one.  But that is pretty much all I have to say about it. Most of the ways you’ve to look at time series data end up something like this:

But before you can get here, you need to handle:

  • Looking at series
  • Inspecting raw series data
  • Tagging / searching series
  • Looking at the roll up values across different dates for different series
  • Delete data (full series or range of data)

Probably other stuff, but those are the things that I can think of.

Probably a good place to do a lot of graphs, too, just to let the users play with the data. But that is what I have in mind here so far.

Oh, and quite obviously, we want to be able to output everything to CSV so users can look at that in Excel, of course.

time to read 5 min | 868 words

We have gone over the system behavior, the wire protocol and how we actually store the data on disk. Now, let us talk about the actual client API. The entry point is going to the TimeSeries class, which will have the following behavior:

Stateless operations:

  • Queries:
    • timeSeries.Query(“sensor1.heat”, “sensor1.flow”)
         .Range(start,end)
         .Rollup(Rollup.Weekly)
         .Aggergation(AggergateBy.Max, AggergateBy.Min, AggergateBy.Mean);
    • timeSeries.SeriesBy(“temp:C”);
  • Operations:
    • timeSeries.Delete(“sensor1.heat”, start, end);
    • timeSeries.Tag(“sensor1.heat”, “temp:C”);

Those types of operations have no state, require nothing beyond just knowing where the server is located and can be immediately executed without requiring any external state. The returned results aren’t tracked or managed  by us in any way, so there is no need for a session. 

Stateful operation - The only stateful operation we have (at least so far) is adding data to the database. We do that using the connection abstraction. This is very close to the actual on the wire representation, which is always good. We have something like:

   1: using(var con = timeSeries.OpenConnection(waitForServerFlush: true))
   2: {
   3:     using(var series = con.AddToSeries("sensor1.heat"))
   4:     {
   5:         for(var i = 0; i < 100; i++) 
   6:         {
   7:             series.Add(time.AddMinutes(i), value + i);
   8:         }
   9:     }
  10: }

This is a bit of an awkward API, but it serves a purpose, it is very close to the way the on-wire format is, and it is optimized for performance, not for being nice.

We can also have:

con.Add(“sensor1.heat”, time, value);

But if you are mixing things up (add sensor1.heat, sensor1.flow and then sensor1.heat again, etc), it probably won’t be as efficient. (It is important to be able to expose those optimizations all the way from the disk to the wire to the client API. Most times, they don’t matter, which is why we have the higher level API, but when they do, they really do.

And… this is pretty much it.

The API will probably be an async one, to keep up with the times, but those are pretty much the high level things that we have here.

time to read 2 min | 251 words

It is easy to forget that a database isn’t just about storing and retrieving data. A lot of work goes into the actual behavior of the system beyond the storage aspect.

In the case of time series database, just storing the data isn’t enough, we very rarely actually want to access all the data. If we have a sensor that send us a value once a minute, that comes to 43,200 data points per month. There is very little that we actually want to do for that amount of data. Usually we want to do things over some rollup of the data. For example, we might want to see the mean per day, or the standard deviation on a weakly basis, etc.

We might also want to do some down sampling. By that I mean that we take a series whose value is stored on a per minute / second basis and we want to store just the per day totals and delete the old data to save space.

The reason that I am using time series data for this series of blog posts is that there really isn’t all that much that you can do for a time series data, to be honest. You store it, aggregate over it, and… that is about it. Users might be able to do derivations on top of that, but that is out of scope for a database product.

Can you think about any other behaviors that the system needs to provide?

time to read 10 min | 1835 words

Now that we actually have the storage interface nailed down, let us talk about how we are going to actually expose this over the wire. The storage interface we defined is really low level and not something that I would really care to try working with if I was writing user level code.

Because we are in the business of creating databases, we want to allow this to run as a server, not just as a library. So we need to think about how we are going to expose this over the wire.

There are several options here. We can expose this as a ZeroMQ service, sending messages around. This looks really interesting, and would certainly be a great research to do, but it is a foreign concept for most people, and it would have to be exposed as such through any API we use, so I am afraid we won’t be doing that.

We can write a custom TCP protocol, but that has its own issues. Chief among which, just like ZeroMQ, we would have to deal, from scratch with things like:

  • Authentication
  • Tracing
  • Debugging
  • Hosting
  • Monitoring
  • Easily “hackable” format
  • Replayability

And probably a whole lot more on top of that.

I like building the wire interface over some form of HTTP interface (I intentionally don’t use the overly laden REST word). We get a lot of things for free, in particular, we get easy ability to do things like pipe stuff through Fiddler so we can easily debug them. That also influence decisions such as how to design the wire format itself (hint, human readable is good).

I want to be able to easily generate requests and see them in the browser. I want to be able to read the fiddler output and figure out what is going on

We actually have several different requirements that we need to do.

  • Enter data
  • Query data
  • Do operations

Entering data is probably the most interesting aspect. The reason is that we need to handle inserting many entries in as efficient a manner as possible. That means reduce the number of server round trips. At the same time, we want to keep other familiar aspects, such as the ability to easily predict when the data is “safe” and the storage transaction has been committed to disk.

Querying data and handling one off operations is actually much easier. We can handle it via a simple REST interface:

  • /timeseries/query?start=2013-01-01&end=2014-01-01&id=sensor1.heat&id=sensor1.flow&aggregation=avg&period=weekly
  • /timeseries/delete?id=sensor1.heat&id=sensor1.flow&start=2013-01-01&end=2014-01-01
  • /timeseries/tag?id=sensor1.heat&tag=temp:C

Nothing much to it, so far. We expose it as JSON endpoints, and that is… it.

For entering data, we need something a bit more elaborate. I chose to use websockets for this, mostly because I want two way communication and that pretty much force my hand. The reason that I want to use two way communication mechanism is that I want to enable the following story:

  • The client send the data to the server as fast as it is able to.
  • The server will aggregate the data and will decide, base on its own consideration, when to actually commit the data to disk.
  • The client need some way of knowing when the data it just sent is actually flushed so it can rely on it.
  • the connection should stay open (potentially for a long period of time) so we can keep sending new stuff in without having to create a new connection.

As far as the server is concerned, by the way, it will decide to flush the pending changes to disk whenever:

  • over 50 ms passed waiting for the next value to arrive, or
  • it has been at least 500 ms from the last flush, or
  • there are over 16,384 items pending, or
  • the client indicated that it wants to close the connection, or
  • the moon is full on a Tuesday

However, how does the server communicate that to the client?

The actual format we’ll use is something like this (binary, little endian format):

   1: enter-data-stream =  
   2:    data-packet+
   3:  
   4: data-packet = 
   5:    series-name-len [4 bytes]
   6:    series-name [series-name-len, utf8]
   7:    entries
   8:    no-entries
   9:  
  10: entries =
  11:    count [2 bytes]
  12:    entry[count]
  13:  
  14: no-entries =
  15:    count [2 bytes] = 0
  16:  
  17: entry =
  18:    date [8 bytes]   
  19:    value [8 bytes]

The basic idea is that this format should be pretty good in conveying just enough information to send the data we need, and at the same time be easy to parse and work with. I am not sure if this is a really good idea, because this might be premature optimization and we can just send the data as json strings. That would certainly be more human readable. I’ll probably go with that and leave the “optimized binary approach” for when/if we actually see a real need for it. The general idea is that computer time is much cheaper than human time, so it is worth the time to make things human readable.

The server keep tracks of the number of entries that were sent to it for each connection, and whenever it is flushing the buffered data to disk, it will send notification to that effect to the client. The client can then decide “okay, I can notify my caller that everything that I have sent has been properly saved to disk”. Or it can just ignore it (usual case if they have a long running connection).

And that is about it as far as the wire protocol is concerned. Next, we will take a look at some of the operations that we need.

time to read 5 min | 976 words

In this post, we are going to talk about the actual design for a time series feature, from a storage perspective. For the purpose of the discussion, I am going to assume the low level Voron as the underlying storage engine.

We are going to need to store the data in a series. And the actual data is going to be relatively ordered (time based, usually increasing).

As such, I am going to model the data itself as a set of Voron trees, one tree per each series that is created. The actual data in each tree would be composed of key and value, that are each 8 bytes long.

  • Key (8 bytes): 100-nanosecond interval between Jan 1, 0001 to Dec 31, 9999.
  • Val (8 bytes): 64 bits double precision floating point number.

There can be do duplicate values for a specific time in a series. Since this represent one ten millionth of a second, that should be granular enough, I think.

Reading from the storage will always be done on a series basis. The idea is to essentially use the code from this post, but to simplify to a smaller key.  The idea is that we can store roughly 250 data points in each Voron page. Which should give us some really good performance for both reads & writes.

Note that we need to provide storage API to do bulk writes, since there are some systems that would require it. Those can either be systems with high refresh rates (a whole set of sensors with very low refresh rates) or, more likely, import scenarios. The import scenario can be either a one time (moving to a new system), or something like a nightly batch process where we get the aggregated data from multiple sources.

For our purposes, we aren’t going to distinguish between the two.

We are going to provide and API similar to:

    • void Add(IEnumerable<Tuple<string,IEnumerable<Tuple<DateTime, double>>> data);

This ugly method can handle updates to multiple series at once. In human speak, this is an enumerable of updates to a series where each update is the time and value for that time. From the storage perspective, this creates a single transaction where all the changes are added at once, or not at all. It is the responsibility of higher level code to make sure that we optimize number of calls vs. in flight transaction data vs. size of transactions.

Adding data to a series that doesn’t exists will create it.

We also assume that series names is up to printable Unicode characters of up to 256 bytes (UTF8).

The read API is going to be composed of:

  • IEnumerable<Tuple<DateTime, double>> ScanRange(string series, DateTime start, DateTime end);
  • IEnumerable<Tuple<DateTime, double[]>> ScanRanges(string []series, DateTime start, DateTime end);

Anything else would have to be done at a higher level.

There is no facility for updates. You can just add again on the same time with a new value, and while this is supported, this isn’t something that is expected.

Deleting data can be done using:

  • void Delete(string series, DateTime start, DateTime end);
  • void DeleteAll(string series);

The first option will delete all the items in range. The second will delete the entire tree. The second is probably going to be much faster. We are probably better off checking to see if the max / min ranges for the tree are beyond the items for this series and falling to DeleteAll if we can. Explicit DeleteAll will also delete all the series tags. While implicit Delete(“series-1”, DateTime.MinValue, DateTime.MaxValue) for example will delete the series’ tree, but keep the series tags.

Series can have tags attached to it. Those can be any string up to 256 bytes (UTF8). By conventions, they would usually be in the form of “key:val”.

Series can be queried using:

  • IEnumerable<string> GetSeries(string start = null);
  • IEnumerable<string> GetSeriesBy(string tag, string start = null);
  • IEnumerable<string> GetSeriesTags(string series);

Series can be tagged using:

  • void TagSeries(string series, string []tags);

There can be no duplicate tags.

In summary, the interface we intend to use for storage would look roughly like the following:

public interface ITimeSeriesStorage
{
    void Add(IEnumerable<Tuple<string,IEnumerable<Tuple<DateTime, double>>> data);
    IEnumerable<Tuple<DateTime, double>> ScanRange(string series, DateTime start, DateTime end);
    IEnumerable<Tuple<DateTime, double[]>> ScanRanges(string []series, DateTime start, DateTime end);
    void Delete(string series, DateTime start, DateTime end);
    void DeleteAll(string series);
    IEnumerable<string> GetSeries(string start = null);
    IEnumerable<string> GetSeriesBy(string tag, string start = null);
    IEnumerable<string> GetSeriesTags(string series);
    void TagSeries(string series, string []tags);
}

Data sizes – assume 1 value per minute per series, that gives us an update rate of 1,440 updates per day or 525,600 per year. That means that for 100,000 sensors (not an uncommon amount) we need to deal with 52,560,000,000 data items per year. This would probably end up being just over 3 GB or so. Assuming 1 value per second, that gives us 86,400 values per day, 31,536,000 per year and 3,153,600,000,000 values per year for the 100,000 sensors will user about 184 GB or so. Those seems to be eminently reasonable values for the data size that we are talking about here. 

Next, we’ll discuss how this is all going to look like over the wire…

FUTURE POSTS

  1. Backup goes wild - 3 hours from now

There are posts all the way to May 24, 2019

RECENT SERIES

  1. Reviewing Sled (3):
    23 Apr 2019 - Part III
  2. RavenDB 4.2 Features (5):
    21 Mar 2019 - Diffing revisions
  3. Workflow design (4):
    06 Mar 2019 - Making the business people happy
  4. Data modeling with indexes (6):
    22 Feb 2019 - Event sourcing–Part III–time sensitive data
  5. Production postmortem (25):
    18 Feb 2019 - This data corruption bug requires 3 simultaneous race conditions
View all series

Syndication

Main feed Feed Stats
Comments feed   Comments Feed Stats