Ayende @ Rahien

Refunds available at head office

Time series feature design: Scale out / high availability

I expect (at a bare minimum), to be able to do about 25,000 sustained writes per second on a single machine. I actually expect to be able to do significantly more. But let us go with that amount for now as something that if it drops below that value, we are in trouble. Over a day, that is 2.16 billion records. I don’t think this likely, but that is a lot of data to work with.

That leads to interesting questions on scale out story. Do we need one? Well, probably not for performance reasons (said the person who haven’t written the code, much less benchmarked it) at least not if my hope for the actual system performance comes about.

However, just writing the data isn’t very interesting, we also need to read and work with it. One of the really nice thing about time series data is that it is very easily sharded, because pretty much all the operations you have are naturally accumulative. You have too much data for a single server, go ahead and split it. On query time, you can easily merge it all back up again and be done with it.

However, a much more interesting scenario for us is the high availability / replication story. This is where things gets… interesting. With RavenDB, we do async server side replication. With time series data, we can do that (and we have the big advantage of not having to worry about conflicts), but the question is how.

The underlying mechanism for all the replication in RavenDB is the notion of etags, of a way to track the order of changes to the database. In time series data, that means tracking the changes that happens in some form of a sane fashion.

It means having to track, per write, at least another 16 bytes. (8 bytes for a monotonically increasing etag number, another 8 for the time that was changed). And we haven’t yet spoken of things like replication of deletes, replication of series tag changes, etc. I can tell you that dealing with replication of deletes in RavenDB, for example, was no picnic.

A much simpler alternative would be to not handle this at the data level. One nice thing about Voron is that we can actually do log shipping. That would trivially solve pretty much the entire problem set of replication, because it would happen at the storage layer, and take care of all of it.

However, it does come with its own set of issues. In particular, it means that the secondary server has to be read only, it cannot accept any writes. And by that I mean that it would physically reject them. That leads to a great scale out story with read replicas, but it means that you have to carefully consider how you are dealing the case of the primary server going down for a duration, or any master/master story.

I am not sure that I have any good ideas about this at this point in time. I would love to hear suggestions.

Comments

tobi
02/19/2014 10:32 AM by
tobi

Maybe version and replicate chunks of data? 1000 events per chunk. Cuts down overhead by 1000x.

Updates of events should be rare, so you could store them as deltas. That would make all events immutable.

Jorge
02/19/2014 10:47 AM by
Jorge

how you are dealing the case of the primary server going down for a duration Just because only one server can accept writes at a give time, doesn't mean it always has to be the same server. A bit like Active Directory FSMO's - there's only one master server, but you can designate which one it is.

Ayende Rahien
02/19/2014 11:05 AM by
Ayende Rahien

Tobi, The problem is that they you have the common case of writes to the end, and you have the last chunk always being written to. And there is there is the issue of how you define a chunk.

Ayende Rahien
02/19/2014 11:06 AM by
Ayende Rahien

Jorge, Yes, you can move it around, but moving the master is not a trivial thing to do at all. You usually need manaul operation to do that.

tobi
02/19/2014 04:00 PM by
tobi

Ayende, good point. I'll drop that approach.

If I remember correctly you're storing multiple independent streams of events. You could store the last replicated position per stream (and per subscriber). You'd need an additional low-volume delta-table as well for updates. How that is implemented should not matter for throughput.

nadav
02/20/2014 07:14 AM by
nadav

Can't you use the timestamp each time serie record inherently has instead of the etag?

Ayende Rahien
02/20/2014 09:16 AM by
Ayende Rahien

Nadav, No, you can't. Imagine that we do an update of an entry that happened 3 weeks ago. How do we detect that?

Ayende Rahien
02/20/2014 09:28 AM by
Ayende Rahien

Tobi, The problem is that you can update things backward. So you have a stream with T1, T10, T20 And you record that you replicated those. Now you have an update to it to T5 You need to replicate that as well.

tobi
02/20/2014 09:31 AM by
tobi

Ayende, I suggest that whenever you "backward update" something, you also create an entry in a special delta stream. That delta stream is immutable and you can replicate it like any other stream. Subscribers apply the deltas when they receive them. All this assumes that deltas are rare.

Ayende Rahien
02/20/2014 09:33 AM by
Ayende Rahien

Tobi, That creates a lot of complexity for us. Imagine trying to trouble shoot "why wasn't this replicated"? I am not sure that this is a valid approach from operational standpoint.

It also doesn't properly handle the scenario where you have to replicate to a new db.

tobi
02/20/2014 09:50 AM by
tobi

That's a fun problem. I'll think about this more.

chaotic-good
02/20/2014 10:41 AM by
chaotic-good

Do you plan to shard by parameter id? If this is the case - what if one of the parameters have most of the writes or we have only one parameter (one time-series)?

Ayende Rahien
02/21/2014 12:03 PM by
Ayende Rahien

Chaotic-Good, There isn't any requirement to shard by the source id (what I think you call parameter id). You can still get really good results by merging the replies from multiple servers on the same source id.