Ayende @ Rahien

Refunds available at head office

Finding a port in a storm: Or how to get a consistent build experience

RavenDB is a server product, as such, we are obvious going to have to talk over the network. That is great, except that it means we need a system wide resource, a TCP port. And that isn’t so great, because it turn out that there are a lot of things that can try to grab a port and use it.

And I don’t speak about things like Skype taking over port 80. I’m talking about the use of dynamic or ephemeral ports. That means that during the build, we can have something (for example, Chrome, or DropBox, or anything that uses the web) make a call and suddenly that port is busy and you can’t bind to it.

The default ports we use for tests are in the 8070 – 8090 range, but we have had consistent build failures because something is taking the ports. Finally I took the time to investigate, and I discovered that:

   1: C:\Work\ravendb-3.0 [master]> netsh int ipv4 show dynamicport tcp
   2:  
   3: Protocol tcp Dynamic Port Range
   4: ---------------------------------
   5: Start Port      : 1025
   6: Number of Ports : 64510

So, on the two machines I tested this on, we had the dynamic port range cover everything from port 1025 and up. This sucks. And probably explains the issue. This is how we fixed it (I hope):

   1: netsh int ipv4 set dynamicportrange tcp startport=10000 numberofports=55535

Basically, everything below port 10,000 is free for us to use. And yes, this post is here mostly so I’ll not have to remember it in the future.

Tags:

Published at

Originally posted at

Comments (3)

RavenDB Conf: Success!

Now that the conference is over ,and I am merely doing 3 back to back RavenDB courses in 3 cities and 2 continents, I can sit back and look at that.

I was, in a word, a blast. We had eight speakers giving 14 talks, about topics that moved from date and time handling in RavenDB to operational concerns to architectural how to. But, to be honest, I think that I found the attendees to be the main attraction. It was really good to finally meet so many of the people we have been talking with over the past few years. And what people are building with RavenDB is flat out amazing, and just a little bit scary. It is a funny thing when I see people take our tool and put it to uses that we have never even imagined.

Another great part was the hackaton. We had, in the space of a few hours, produced a major new feature for RavenDB (distributed counters), with everything wired up: Storage, Wire Protocol and even the UI. We still need to complete the replication bits, but those are coming.

We have recorded the sessions, and we hope to have the edited videos soon. I wanted to take the time and thank the speakers and the attendees for such a great conference .

We’ll be seeing you again soon… Smile.

Tags:

Published at

Originally posted at

Comments (5)

RavenConf, day 1

I started the day at 5:30 AM, because we had to go to the venue and get everything ready. This is now 01:30 AM for the next day, and I am still stoked.

I took this picture at around 8 AM, when people started to queue up to go into the conference:

image

For that matter, you can see a few other photos from the conference here. I’ll do a retrospect about the conference when I have time to breath. But for now, I can report that the RavenDB Hackaton was quite a success, we have a cool new feature mostly implemented. You can see it here: https://github.com/ayende/ravendb/tree/duco

And yes, I’ll talk about that in the future as well. But right now, I’m going to have a db capable of managing a 100 billion records tomorrow, live on stage, so I probably need to get some sleep done…

Published at

Originally posted at

Comments (3)

RavenDB 3.0 milestone

Well, today we have reached a really big milestone for RavenDB 3.0. It is now the master branch in our development mode, and 2.5 is in maintenance mode.

That means that you can go here: http://github.com/ayende/ravendb and see all the cool stuff that we are doing. Official builds will be up for the conference, so you can play with the new stuff Smile.

Tags:

Published at

Originally posted at

Comments (5)

Last RavenDB Conference tickets

We still have a couple of tickets available for the RavenDB conference. And sale of them ends tomorrow. You can buy them here.

I’m already at the states (writing this from JFK right now), and getting ready to show you some very cool stuff next week.

Tags:

Published at

Originally posted at

My distributed build system

Yes, I know that you are probably getting geared up to hear about some crazy setup, and in some manner, it is crazy. My distributed build system is this:

IMG_20140402_103433

Yep, that is me manually distributing the build to do a cross check on a reasonable time frame.

I’ve mentioned before that our build time is atrocious. With over 3,500 tests, this is no longer a feasible alternative to just run them normally. So we started parallel efforts (pun intended), to reduce the time it takes the system to build and reduce individual test times as well as the ability to parallelize things.

We are actually at the point where we can run concurrent tests, even those we previously had to distribute. And we can even do that with replicated tests, which are the ones usually taking the longest. But what we’ll probably have in the end is just a bunch of test projects (currently we have ~5) that we will run on different machines at the same time.

We are using Team City for build system, and I know that it has capabilities in this regard, but I’ve never actually looked into that. We’re going to be pretty busy in the next couple of weeks, so I thought that this would be a good time to ask.

My current thinking is:

  • One build that would actually do the compilation, etc.
  • Then branch off that to a build per test project, which can run on different agents
  • Then a build on top of that that can actually deploy the build.

Any recommendations on that? We are going to need 5 – 7 agents to run the tests in parallel. Any recommendations on how to get those? Ideally I would like to avoid having to pay for those machines to sit idle 60% – 80% of the time.

Any other alternatives?

Reduce ^ 2 in RavenDB

An interesting question that keeps popping up is how to re-reduce the results of a map/reduce. That is really nice feature on the surface, but it has a lot of implications, for example, when / how you run the 2nd reduce, can you chain only 1 time, or multiple times , what happens when there are a lot of reduce results, etc.

But most of the time, what people want is to be able to do aggregation on the map/reduce results without too much hassle, and they don’t have a lot of aggregated results or they are fine with waiting for them if they are very large. And we have a really nice solution for that scenario.

You start by defining the base map/reduce operation, like so:

image

Note that we need to also output the fields that we care about reducing further. In this case, we start by reducing to postal code, but we keep the city, region and country options as well.

Then, we define a transformer. Note that this is a special transformer, in that it has a group by in it, and it takes some parameters from outside.

image

Using those two together, we can now get the following results…

Raw map/reduce output:

image

With loc = City, we get:

image

With loc = Country, we get:

image

Tada, we have reduced further the result of a map/reduce operation. Now, this is subject to the usual limitations of RavenDB paging, in that it will only go through the only 1024 results. That can be a problem, but that is why RavenDB has the Streaming API.

You can use streaming on a map/reduce index with a transformer (and even apply parameters on top of that). That end up giving you the ability to run a re-reduction on top of a map/reduce index regardless of size.

Of course, on very large result sets, that can take quite a while, but that is expected and usually fine. For that matter, if you need to, you can chain the stream into a bulk insert, and get the re-reduction in that manner.

Tags:

Published at

Originally posted at

Comments (4)

Differences in Map/Reduce between RavenDB & MongoDB

Ben Foster has a really cool article showing some of the similarities and differences between MongoDB & RavenDB with regards to their map/reduce implementation.

However, there is a very important distinction that was missed. Map/reduce operations are run online in MongoDB, that means that for large collections, map/reduce is going to be very expensive. MongoDB has the option of taking the result of a map/reduce operation and writing it to a collection, so you don’t need to run map/reduce jobs all the time. However, that is a snapshot view of the data, not a live view. Ben mentioned that you can do something called incremental map/reduce, but that isn’t actually really good idea at all.

Let us look at the following sequence of operations:

   1: db.items.insert({name: 'oren', ts: 1 });
   2: db.items.insert({name: 'ayende', ts: 2});
   3:  
   4: var map = function Map() { emit(this.name,null); };
   5: var reduce = function(key, val) { return key; };
   6:  
   7: db.items.mapReduce(map,reduce, { out: 'distinct_item_names' });

This creates two items, and give me the distinct names in a separate collection. Now, let us see how that works with updates…

   1: db.items.insert({name: 'eini', ts: 3 });
   2:  
   3: db.items.mapReduce(map,reduce, { out: {reduce: 'distinct_item_names'}, query: {ts: {$gt: 2} } });

This is actually nice, mongo is able to merge the previous results with the new results, so you only have to do the work on the new data. But this has several implications:

  • You have to kick something like ‘ts’ property around to check for new stuff. And you have to _udpate_ that ts property on every update.
  • You have to run this on a regular basis yourself, mongo won’t do that for you.
  • It can’t work with deletes.

It is the last part that is really painful:

   1: db.items.remove({name: 'oren'});

Now, there is just no way for you to construct a map/reduce job that would remove the name when it is gone.

This sort of thing works very nicely when what you want is to just append stuff. That is easy. It is PITA when we are talking about actually using it for live data, that can change and be modified.

Contrast that with the map/reduce implementation in RavenDB:

  • No need to manually maintain state, the database does it for you.
  • No need to worry about updates & deletes, the database does it for you.
  • No need to schedule map/reduce job updates, database does it for you.
  • Map/reduce queries are very fast, regardless of data size.

To be frank, the map/reduce implementation in RavenDB is complex, and pretty much all of it comes down to the fact that we don’t do stupid stuff like run a map/reduce operation on a large database on every query, and that we support edge cases scenarios like data that is actually updated or deleted.

Naturally I’m biased, but it seems to me that trying to use map/reduce in Mongo just means that you have to do a lot of hand holding yourself, while with RavenDB, we take care of everything and leaving you to actually do stuff.

Tags:

Published at

Originally posted at

Comments (10)

Everything falls down in its proper place

As we are getting ready for the RavenDB Conference, our team is actually starting to show the fruits of all of this hard work. For example, take a look at the new studio replication UI configuration. There are a lot of things like that, and I am really pumped to see all the disparate things that we have been doing over the past year come together and join up into a single and very pretty picutre.

image

Tags:

Published at

Originally posted at

Comments (7)

More xUnit tweaks, dynamic test skipping

For a long time, xUnit’s dev has resisted adding support for skipping a test dynamically. You could create your own Fact class and handle that yourself, but that was quite a lot of work, for something very specific.

In RavenDB, we had the need to dynamically decide whatever the test can run based on the actual test situation, so I decided to add this to our xUnit fork. This turned out to be really simple to do. Just three lines of code Smile

https://github.com/ayende/xunit/commit/82accb4c850a3938187ac334fb73d6e81dc921e3#diff-3c2b9f2cb8392f32456d0bf81151b59fR57

RavenDB Conference Status Update

noname

As you probably know, we have the RavenDB Conference coming up in a few weeks. This is the very first conference that we are doing, and to say that we are excited would be a major understatement. We are going to have speakers talk about RavenDB from all angles. From the development team (including yours truly) to the operation guys behind RavenHQ as well as customers and consultants that can share their real world experience about how to best utilize RavenDB.

The idea is to have an intimate gathering, and for the very first time, to actually have direct way to talk with all of our users, as well as share a lot of the stuff that happens behind the scenes. Not to mention, we crave feedback, and several key features of RavenDB were actually proposed by the community and then implemented by us.

I was reviewing the registration numbers for the conference and it looks like we have made a… miscalculation. The idea with the conference was to charge just enough to ensure a commitment to come, to avoid the plague of free conferences where many people sign up and most never come. We even offset things so you can pay 89$ for the conference and you get a 90$ coupon for RavenDB. On the side, since we are already there, we also setup an additional 3 days course for an in-depth look at RavenDB.

But we had a lot more demand for the conference and the course than we thought we would have. We currently have just 2 places open for the course, and about 15 – 20 places open for the conference.

Since the course is more expensive, that was a the kind of surprise that is actually kinda nice to have: “Wait, you mean that they are giving us more money than we thought they would…”

At any rate, I find myself in the strange situation of encouraging people to go and buy the cheaper product, simply because we are really going to have no more room for the course soon.

You can register here, I’m looking forward to seeing you all.

Tags:

Published at

Originally posted at

Comments (1)

Raft, as the Raven flies

The interesting thing about the etcd / go-raft review wasn’t so much what they did, they did things quite beautifully, but the things that I wanted them to do that they didn’t do. The actual implementation was fascinating, and for the purposes of what is required, more than adequate. I, however, have a very specific goal in mind, and that means that this is really not going to work with the given implementations.

I’ve better explain first. I’m not sure if go-raft was written by the same people as etcd, but they do “smell” very much the same. It just occurred to me that yes, they are actually both on github, so I checked it appears that I was correct. xiangli-cmu, philips, benbjohnson and bcwaldon (among many others) have worked on both projects. The reason this is important is that this reinforce my feeling that go-raft was written mainly to serve the need of etcd. Anyway…

The reason that I think that is that the implementation is very much optimized at the level etcd needs it. What I mean is that there is a lot of work there to support multiple concurrent operations that would be batched to all the nodes in the cluster, then be applied in the in memory model. This work because etcd is a set of key/value pairs that are shared among many nodes, but they are usually very small values, and very small number of keys. I would be surprised if there were hundreds of thousands of keys in a single etcd installation. That just isn’t the type of thing that it is meant to do. And that reflects throughout.

For example, there is an inherit assumption through the codebase, that the data set is small, the each value is small, etc. When sending a snapshot over the wire, there is the assumption that it can all fit in memory and that this is a good thing to do so. Even more to the point, the system where we only push entries to the nodes on heartbeats is great when you are trying to batch multiple changes together, with each change being independent of all the others (usually). However, that really doesn’t work when what you actually need is something like the sort of things that we usually deal with.

I want to emphasize that I’m really evaluating the way etcd & go-raft are built and find them wanting for my own purposes, I think that they are doing quite great for the kind of problem that they are meant to solve.

Let us consider the kind of databases that we have. We usually have to deal with much larger values, typical documents are in the KB range, and are frequently hundreds of KB in size. We also tend to have quite a lot of them. Trying to put them all in memory (the etcd way) would be… suboptimal. Now, one option would be to try and do just that, and have each operation in RavenDB (put/delete documents) as a state machine command in Raft. The problem is that this really gets complicated very quickly, leaving aside the fact that it isn’t a general solution, and we really want to try to get to a general solution.

We don’t want to solve this once for RavenDB, and then for RavenFS, and then for… etc. And the reason that working at that level is tricky is that you need to push everything through the state machine, and everything in this case really does mean everything. That lead to a very different database design and implementation. It is also probably not really necessary. We can drop this a level or two down and instead of dealing at the database level, we can deal with that at the storage layer level. This is basically just an extension of the log shipping idea. Instead of using Raft for high level commands, just use Raft to create a consensus around the sequence of writes to the journal. That means that all the nodes will have the same journal, and thus have the same data in the storage.

That in turn means that an “entry” can actually be pretty large (mutli MB range, sometimes). And because the journal is purely sequential, we need to issue that command to all the nodes as soon as it is submitted to the Raft state machine.

Now, the reason that the go-raft implementation waits for the heartbeat is that it batches things up nicely, and really speed up the general case. But we don’t need to do that for what we are going to do. Or, to be rather more exact, we have already done just that. That is basically what Voron’s transaction merging is all about. And since we can only handle writes as the leader, and since we will merge concurrent writes anyway, that is going to end up as a very similar behavior under load, and faster without load. At least that is what the initial thinking is saying.

Snapshots, also something quite important for Raft, can be handle very simply by just doing a backup/restore over the network, by streaming all the data.

And the rest is just implementing Raft Smile.

Published at

Originally posted at

RavenDB Success Stories: Codealike

I just run into the following case study from Microsoft:

Codealike

*
We were amazed by how simple it was to embed RavenDB into our Windows Azure application. This gave us a huge advantage in getting our solution to market in just six weeks.
*

Federico Andres Lois
Co-Founder, Codealike

*

Tags:

Published at

Originally posted at

Comments (2)

Usage conventions for using Voron

As we are gearing up to do more & more stuff in Voron, it occurred to me that while we have settled on a good technological system for it, we haven’t settled on a real set of conventions for real use. We’re probably going to see a lot of use in Voron, and we want to see some consistency and best practices there.

Voron is a key/value store that expose a sorted tree abstraction. You can have as many trees as you would like. And the keys & values are both arbitrary byte strings. Given that, let us try to bring some order to the mix.

Don’t use the root tree

The root tree should reserved for handling of other trees, and not for the use of data of any kind. The only case where using the root tree is fine is if you don’t have any other trees. That tend to be a rare occasion, though. See next topic.

Have a $metadata tree

Always have a $metadata tree, that gives you information about the actual database you are using. For example, you’ll want to have things like the storage version, the database id (always a guid, to be able to tell dbs from their backups), etc.

Alpha numeric values only for tree names, please

You can use any value as a tree name, but you really want to limit yourself to the printable ASCII set. This is recommended because dumping the data to any other format (think debugging) would be greatly enhanced if you can actually read the tree names.

Use @tree for super trees

It is very common to have trees where all their data is handled via MutiAdd, MultiRead, etc. We call such trees super trees (they are trees that contains trees, also see big table). While they are usually used for indexing, there are many cases where you want to do that for things like queues, general run of the meal data (this is great for holding edges in a graph, for example), etc.

Prefix ix_ for all indexing trees

I’m not so sure about this, but it is worth mentioning. The purpose here is to distinguish between standard trees and trees that can be rebuild from scratch if needed. That can be of help in diagnostics mode, for example.

Dynamic trees should be explained

It is very easy to create trees in Voron. But like files, you don’t just create some around for no purpose. A tree cost 4KB (minimum), and more importantly, if you are looking at your storage, you want to be able to make sense of things. If you are using a tree as a queue for a particular destination, make sure that you name it appropriately.

Alternatively, use the $metadata tree to keep track of what goes where.

Avoid non printable key names

Just like in tree names, you can use any byte string in a key name, but you want to be able to read debug data or run diagnostics, it would really help if you could actually look at the data .

Remember, sequential writes are best

Voron will deal with random writes just fine, but it would be far  faster to write if you’ll arrange things so they are sequential. It is fine if you have once in a while a random write. But try to keep things sequential if at all possible. On that node, sequential for us means increasing, all of our optimizations assumes that. Decreasing sequential data is currently not as optimized.

Write and end, delete at start

This is a common operation you need for queues. It is generally better to do that with writing of the data at the end and removing from start. If you can, avoid just deleting stuff all over the place. Again, that works just fine, but we’ve optimized Voron to handle this scenario very well.

Keep the data simple

Voron does a lot with memory mapping. If the data you can use can be read directly, you can literally just access it off our own buffer, and have no copy required at all.

And that is all I have for now…

Tags:

Published at

Originally posted at

Distributed counters feature design

This is another experiment with longer posts.

Previously, I used the time series example as the bed on which to test some ideas regarding feature design, to explain how we work and in general work out the rough patches along the way. I should probably note that these posts are purely fiction at this point. We have no plans to include a time series feature in RavenDB at this time. I am trying to work out some thoughts in the open and get your feedback.

At any rate, yesterday we had a request for Cassandra style counters at the mailing list. And as long as I am doing feature design series, I thought that I could talk about how I would go about implementing this. Again, consider this fiction, I have no plans of implementing this at this time.

The essence of what we want is to be able to… count stuff. Efficiently, in a distributed manner, with optional support for cross data center replication.

Very roughly, the idea is to have “sub counters”, unique for every node in the system. Whenever you increment the value, we log this to our own sub counter, and then replicate it out. Whenever you read it, we just sum all the data we have from all the sub counters.

Let us outline the various parts of the solution in the same order as the one I used for time series.

Storage

A counter is just a named 64 bits signed integer. A counter name can be any string up to 128 printable characters. The external interface of the storage would look like this:

   1: public struct CounterIncrement
   2: {
   3:     public string Name;
   4:     public long Change;
   5: }
   6:  
   7: public struct Counter
   8: {
   9:     public string Name;
  10:     public string Source;
  11:     public long Value;
  12: }
  13:  
  14: public interface ICounterStorage
  15: {
  16:     void LocalIncrementBatch(CounterIncrement[] batch);
  17:  
  18:     Counter[] Read(string name);
  19:  
  20:     void ReplicatedUpdates(Counter[] updates);
  21: }

As you can see, this gives us very simple interface for the storage. We can either change the data locally (which modify our own storage) or we can get an update from a replica about its changes.

There really isn’t much more to it, to be fair. The LocalIncrementBatch() increment a local value, and Read() will return all the values for a counter. There is a little bit of trickery involved in how exactly one would store the counter values.

For now, I think we’ll store each counter as two step values. We’ll have a tree of multi tree values that will carry each value from each source. That means that a counter will take roughly 4KB or so. This is easy to work with and nicely fit the model Voron uses internally.

Note that we’ll outline additional requirement for storage (searching for counter by prefix, iterating over counters, addresses of other servers, stats, etc) below. I’m not showing them here because they aren’t the major issue yet.

Over the wire

Skipping out on any optimizations that might be required, we will expose the following endpoints:

  • GET /counters/read?id=users/1/visits&users/1/posts <—will return json response with all the relevant values (already summed up).
    { “users/1/visits”: 43, “users/1/posts”: 3 }
  • GET /counters/read?id=users/1/visits&users/1/1/posts&raw=true <—will return json response with all the relevant values, per source.
    { “users/1/visits”: {“rvn1”: 21, “rvn2”: 22 } , “users/1/posts”:  { “rvn1”: 2, “rvn3”: 1 } }
  • POST /counters/increment <– allows to increment counters. The request is a json array of the counter name and the change.

For a real system, you’ll probably need a lot more stuff, metrics, stats, etc. But this is the high level design, so this would be enough.

Note that we are skipping the high performance stream based writes we outlined for time series. We’ll probably won’t need them, so that doesn’t matter, but they are an option if we need them.

System behavior

This is where it is really not interesting, there is very little behavior here, actually. We only have to read the data from the storage, sum it up, and send it to the user. Hardly what I’ll call business logic.

Client API

The client API will probably look something like this:

   1: counters.Increment("users/1/posts");
   2: counters.Increment("users/1/visits", 4);
   3:  
   4: using(var batch = counters.Batch())
   5: {
   6:     batch.Increment("users/1/posts");
   7:     batch.Increment("users/1/visits",5);
   8:     batch.Submit();
   9: }

Note that we’re offering both batch and single API. We’ll likely also want to offer a fire & forget style, which will be able to offer even better performance (because they could do batching across more than a single thread), but that is out of scope for now.

For simplicity sake, we are going to have the client just a container for all of endpoints that it knows about. The container would be responsible for… updating the client visible topology, selecting the best server to use at any given point, etc.

User interface

There isn’t much to it. Just show a list of counter values in a list. Allow to search by prefix, allow to dive into a particular counter and read its raw values, but that is about it. Oh, and allow to delete a counter.

Deleting data

Honestly, I really hate deletes. They are very expensive to handle properly the moment you have more than a single node. In this case, there is an inherent race condition between a delete going out and another node getting an increment. And then there is the issue of what happens if you had a node down when you did the delete, etc.

This just sucks. Deletion are handled normally, (with the race condition caveat, obviously), and I’ll discuss how we replicate them in a bit.

High availability / scale out

By definition, we actually don’t want to have storage replication here. Either log shipping or consensus based. We actually do want to have different values, because we are going to be modifying things independently on many servers.

That means that we need to do replication at the database level. And that leads to some interesting questions. Again, the hard part here is the deletes. Actually, the really hard part is what we are going to do with the New Server Problem.

The New Server Problem dictates how we are going to bring a new server into the cluster. If we could fix the size of the cluster, that would make things a lot easier. However, we are actually interested in being able to dynamically grow the cluster size.

Therefor, there are only two real ways to do it:

  • Add a new empty node to the cluster, and have it be filled from all the other servers.
  • Add a new node by backing up an existing node, and restoring as a new node.

RavenDB, for example, follows the first option. But it means that in needs to track a lot more information. The second option is actually a lot simpler, because we don’t need to care about keeping around old data.

However, this means that the process of bringing up a new server would now be:

  1. Update all nodes in the cluster with the new node address (node isn’t up yet, replication to it will fail and be queued).
  2. Backup an existing node and restore at the new node.
  3. Start the new node.

The order of steps is quite important. And it would be easy to get it wrong. Also, on large systems, backup & restore can take a long time. Operationally speaking, I would much rather just be able to do something like, bring a new node into the cluster in “silent” mode. That is, it would get information from all the other nodes, and I can “flip the switch” and make it visible to clients at any point in time.  That is how you do it with RavenDB, and it is an incredibly powerful system, when used properly.

That means that for all intents and purposes, we don’t do real deletes. What we’ll actually do is replace the counter value with delete marker. This turns deletes into a much simple “just another write”. It has the sad implication of not free disk space on deletes, but deletes tend to be rare, and it is usually fine to add a “purge” admin option that can be run on as needed basis.

But that brings us to an interesting issue, how do we actually handle replication.

The topology map

To simplify things, we are going to go with one way replication from a node to another. That allows complex topologies like master-master, cluster-cluster, replication chain, etc. But in the end, this is all about a single node replication to another.

The first question to ask is, are we going to replicate just our local changes, or are we going to have to replicate external changes as well? The problem with replicating external changes is that you may have the following topology:

image

Now, Server A got a value and sent it to Server B. Server B then forwarded it to Server C. However, at that point, we also have a the value from Server A replicated directly to Server C. Which value is it supposed to pick? And what about a scenario where you have more complex topology?

In general, because in this type of system, we can have any node accept writes, and we actually desire this to be the case, we don’t want this behavior. We want to only replicate local data, not all the data.

Of course, that leads to an annoying question, what happens if we have a 3 node cluster, and one node fails catastrophically. We can bring a new node in, and the other two nodes will be able to fill in their values via replication, but what about the node that is down? The data isn’t gone, it is still right there in the other two nodes, but we need a way to pull it out.

Therefor, I think that the best option would be to say that nodes only replicate their local state, except in the case of a new node. A new node will be told the address of an existing node in the cluster, at which point it will:

  • Register itself in all the nodes in the cluster (discoverable from the existing node). This assumes a standard two way replication link between all servers, if this isn’t the case, the operators would have the responsibility to setup the actual replication semantics on their own.
  • New node now starts getting updates from all the nodes in the cluster. It keeps them in a log for now, not doing anything yet.
  • Ask that node for a complete update of all of its current state.
  • When it has all the complete state of the existing node, it replays all of the remembered logs that it didn’t have a chance to apply yet.
  • Then it announces that it is in a valid state to start accepting client connections.

Note that this process is likely to be very sensitive to high data volumes. That is why you’ll usually want to select a backup node to read from, and that decision is an ops decision.

You’ll also want to be able to report extensively on the current status of the node, since this can take a while, and ops will be watching this very closely.

Server Name

A node requires a unique name. We can use guids, but those aren’t readable, so we can use machine name + port, but those can change. Ideally, we can require the user to set us up with a unique name. That is important for readability and for being able to alter see all the values we have in all the nodes. It is important that names are never repeated, so we’ll probably have a guid there anyway, just to be on the safe side.

Actual Replication Semantics

Since we have the New Server Problem down to an automated process, we can choose the drastically simpler model of just having an internal queue per each replication destination. Whenever we make a change, we also make a note of that in the queue for that destination, then we start an async replication process to that server, sending all of our updates there.

It is always safe to overwrite data using replication, because we are overwriting our own data, never anyone else.

And… that is about it, actually. There are probably a lot of details that I am missing / would discover if we were to actually implement this. But I think that this is a pretty good idea about what this feature is about.

Published at

Originally posted at

Time series feature design: Storage replication & the bee’s knees

Being able to handle replication at the storage level is a really nice feature to have. More than that, it is a feature that can be broadly applied. But… a database is a lot more than just storage. And being able to just move the data around between machines is nice, but there are other things we have to take into account.

In particular, when we replicate via storage changes, we don’t have a good way to take actions on changes. Most of the time, that means that we can’t actually rely on internal caches, and would probably have to deal with that somehow in another fashion. But there are usually secondary processing that is done on a node that would have to be accounted for.

For example, let us assume that we had the ability to replicate RavenDB (docs) changes between machines using storage replication. The problem here is that we would be replicating the documents, but not the indexes, and when we do that, we would need to index the changed documents on the destination node. However, that would actually require two data stores, one for the actual documents data, and one for all of the non replicated data (indexing, stats, etc).

In other words, I think that such a database would have to be designed specifically for that scenario.

In addition to that, it would probably be best for the storage replication to also be annotated with information for higher level code. So if you modify this range in the file, you’ll also know that you need to drop the following documents from the cache.

Published at

Originally posted at

What does the test say?

Because RavenDB is a database, a lot of the tests we have to run are pretty long. For example, we need to touch the disk a lot, and we have a lot of networked tests.

that means that running this test suite can take a while. But the default information we get is pretty lousy. Just the test count and that is it. But when a test hang, and they do if we have bugs, it make it very hard to figure out where the culprit is.

So we forked xunit and added a tiny feature to the console runner:

image

Time series feature design: The Consensus has dRafted a decision

So, after reaching the conclusion that replication is going to be hard, I went back to the office and discussed those challenges and was in general pretty annoyed by it. Then Michael made a really interesting suggestion. Why not put it on RAFT?

And once he explained what he meant, I really couldn’t hold my excitement. We now have a major feature for 4.0. But before I get excited about that (we’ll only be able to actually start working on that in a few months, anyway), let us talk about what the actual suggestion was.

Raft is a consensus algorithm. It allows a distributed set of computers to arrive into a mutually agreed upon set of sequential log records. Hm… I wonder where else we can find sequential log records, and yes, I am looking at you Voron.Journal.

The basic idea is that we can take the concept of log shipping, but instead of having a single master/slave relationship, we change things so we can put Raft in the middle. When committing a transaction, we’ll hold off committing the transaction until we have a Raft consensus that it should be committed. The advantage here is that we won’t be constrained any longer by the master/slave issue. If there is a server down, we can still process requests (maybe need to elect a new cluster leader, but that is about it).

That means that from an architectural standpoint, we’ll have the ability to process write requests for any quorum (N/2+1). That is a pretty standard requirement for distributed databases, so that is perfectly fine.

That is a pretty awesome thing to have, to be honest, and more importantly, this is happening at the low level storage layer. That means that we can apply this behavior not just to a single database solution, but to many database solutions.

I’m pretty excited about this.

Shiny features in the depth: New index optimization

One of the nice features in RavenDB 3.0 is optimizing the process of creating a new index. In particular, we want to optimize it when you create a new index on a small collection in a large database.

If you have a small database, you don’t care, it is going to complete quickly anyway. If you are creating an index on a collection that compose a significant amount of the documents in the database, you don’t care, you are going to have to do a lot of work anyway. But the common case for a big database is that you usually have one very big collection, and much smaller collections for everything else.

In RavenDB 2.x, you still have to pay the full price for indexing everything, but that isn’t the case in RavenDB 3.0. What we have done is to effectively optimize the process so that in this case, we will preload all of the documents taking part in the relevant collection, and send them directly to be indexed.

We do this by utilizing the Raven/DocumentsByEntityName index. Which has already indexed everything in the database anyway. This is a nice little feature, because it allows us to really take advantage of the work we already did long ago. Using one index to pre-populate another is a neat trick, and one that I am very happy about.

Because this is a new code path, it also means that it is actually executed outside of standard indexing. And that in turn means that adding a new index will not impact other indexes at all.

This is a small feature, but it does address a common pain point our users have with working in RavenDB in production.

Reminder, we have our upcoming RavenDB Conference in April, where we’ll discuss other stuff in the 3.0 release.

Tags:

Published at

Originally posted at

Comments (8)

RavenDB 3.0 Status Update

We are gearing up for the RavenDB Conference in April and we just released a private alpha preview sneak peek to a few external people. But we have been working on RavenDB 3.0 for the past 18 months or so, some bits in it are actually dated from 2011(!) that we only now are able to actually put into production. That is a lot of work that is going on and it is easy to actually get lost in what is going on there.

So, without further ado, here are the major highlights of RavenDB 3.0…

Indexing

Yes, we did more work on improving indexing performance. But that is actually secondary. What we really focused on this release are operational indexing concerns.

What does this means? Well, to start with, internally we don’t use the index name any longer. That means that we can do silly things like make an index delete async. Users that have large indexes, especially map/reduce or indexes with LoadDocument calls found that deleting an index can take a very long while. Now this is no longer the case, we are now able to immediately delete the index, and actually do the cleanup in the background.

For that matter, another operational concern people has is the introduction of new indexes. Especially if the index in question covers just small part of a very large database, that used to take a very long time. The index would have to go through all the documents in the database to complete indexing. Now, we are able to make several optimizations that means that we can take just the relevant documents, and complete indexing much more quickly in this scenario.

Introducing a new index would split it entirely from the other indexes while it is running, so you won’t have to deal with a new index slowing down other indexes. And neither will big deletes impact indexing performance in this manner any longer, we have much better interleaving of that now.

Still on an operational bent, we now have much better reporting on what is actually being indexed. You can see what the indexes are doing, and act accordingly.

From a development point of view, we have added some nice things. The ability to index an attachment, so you can index big text without it residing in documents. We have also added some better situational awareness in the indexing code. We have some people who were doing… funny things there. Indexes that were producing hundreds of index entries per every document they indexed. Then we had to deal with the associated performance problems. We now can detect and warn about that, and we let the user specific the valid limits on a per index case.

There are other stuff in indexing, but I want to go over the rest of what we did for 3.0…

RavenFS

RavenFS was created because RavenDB’s attachments are nice, but they aren’t nearly good enough. We have a lot of users that want to use attachments, then they find that they can’t see them, search on them in meaningful ways, etc. More importantly, they are limited because we intended them to be of relatively small size. Users really asked us for something better, and RavenFS is the answer.

We are talking about a replicated file system, which supports very large files as well as all the facilities to work and manage them. It was explicitly designed to  be geo-distributed and it can drastically decrease the network load on systems that need to share very large files that change frequently.

I’m going to talk about RavenFS quite a bit in my keynote in the RavenDB Conference, but I think that it is really cool and there are some very nice use cases for it. Just for fun, it has been used in production by several customers for the past 2 years, so we already have some great experience with that.

JVM Client API

A fully functional JVM client opens us for more fronts with regards to who can make use of RavenDB. We already have people building applications using that. And we intend to have more clients for additional platforms after 3.0 is shipped.

Internal changes

There have been a lot of that, actually. But the one of the most important ones is that we are now hosting RavenDB on top of OWIN and Web API. The change, from our own HTTP server, was done in order to help our users have a better foundation to understand how RavenDB works internally, and to encourage contributions. It also allow us to do some nice things, like have an end point that documents all the endpoints in a RavenDB server.

Another important change is that we are moving away from the Silverlight studio in favor of a brand new HTML5 studio. That is quite exciting, especially because the performance and responsiveness of the system as a whole become so much better. And it doesn’t hurt that we don’t have to deal with the complexities of Silverlight.

Voron

This one probably got the most attention, but hopefully it will be least noticed once you actually get the bits. Voron is our new storage engine. In fact, it is more than that, it is a new way for us to store data, which we use in RavenDB to store our transactional data. The reason it is more than that is that it isn’t limited to what RavenDB currently needs to do. It can do much more, and we are already in the process of doing quite a lot more with it than one might suspect from outside. I’ll leave that for later, and just talk about what we have already done.

RavenDB now have the option of running on Voron. In fact, we have tested it with the entire RavenDB test suite (over 3,000 tests) and it passes with flying colors.

Voron will allow us to run on Linux, at some point, but it is a lot more important that it allows us to very carefully tune our storage usage and get a much better appreciation for how we are actually doing things. We expect to be able to do some really nice things with it, and it has already shown itself to be competitive with regards to performance against Esent.

Operations

There is always that, isn’t it. And there is a reason why is is last, but never least, in this list. (Try to say that repeatedly, fast Smile).

We kicked off performance counters, which caused no end of operational headaches (corrupted counters, permission issues, hanging, etc) in favor of an internal metrics library. Because it is internal, we are able to add a lot more metrics and a lot more meaningful metrics to the system.

We have new endpoints that expose even more internal states. We improved periodic backup support so it would be much nicer to work with (we now allow to define: full export every week, periodic export every day). There are quite a few goodies there available for the ops people to get insight into what is going on.

And… those are the highlights, and the code aspect of things.

We have  a team of about a dozen people working on RavenDB at this time, and we keep growing. This is quite exciting, and I’m really looking forward to getting to meet our users in the conference…

Here is a hint, there are going to be surprises…

Tags:

Published at

Originally posted at

Comments (9)

Time series feature design: Replication

Armed with the knowledge about replication strategies from my previous post, we can now consider this in the context of the time series database.

We actually have two distinct pieces of data that we track. We have the actual time data, the timestamp and value that we keep track of, and we have the series information (tags, mostly).  We can consider using log shipping here, and that would give us a great way to get a read replica. But the question is why we would want to do that. It is nice to get a replica, but that replica would have to be read only. Is that useful?  It could take over, if the master is down, but that would mean that the master would have to stay down (or converted to a slave). And divergent writes are a problem.

While attractive as a cheap way to handle replication, I don’t like this very much.

So that leaves us with using a multi write partners situation. In this case, we can allow the two servers to operate in tandem. We need to have some way to resolve conflicts, and this is where things gets a bit messy.

For series data, it is trivial to just use some form of last write wins. This assumes a synchronized clock between the servers, but we’ll leave that requirement for now.

The problem is with the actual time data. Conceptually, we intend to store the information like this:

image

The problem is how do you detect conflicts. And are they really even possible. Let us assume that we want to update a particular value at time T on both servers. Server A replicates to server B, and now we need to decide how to deal with it. Ignore the value? Overwrite the value?

The important thing is that we need some predictable way to handle this that will end up with all the nodes in the cluster having the same agreed upon value. The simplest scenario, assuming a clock sync, is to use the write timestamp. But that would require us to keep the write time stamp. Currently we can use just 16 bytes for each time record. But recording the write timestamp will increase our usage to 24 bytes. That is a 50% increase just to handle conflicts. I don’t want to pay that.

The good thing about time series data is that a single value isn’t that important, and the likelihood that they will be changed it relatively small. We can just decide to say: We’ll choose a value, for example, we will choose the maximum value for that time, and be done with it. That has its own set of problems, but we’ll deal with that in a bit. We need to discuss how we deal with replication in general, first.

Let us imagine that we have 3 servers:

  • Server A replicates to B
  • Server B replicates to C
  • Server C replicates to A

We have concurrent writes to the same time value on both server A and B. For the purpose of the discussion, let us assume that we have a way to resolve the conflict.

Server A notifies Server B about the change, but server B already have a different value for that. Conflict resolution is run, and we have a new value .That value need to be replicated down stream. It goes to Server C, who then replicate it to Server A, who then replicates it to Server B? Ad infinitum?

I intentionally chose this example, but the same thing can happen with just two servers replicating to one another (master/master). And the problem here is that in order to be able to actually track this properly, we are going to need to keep a lot of metadata around, per value. While I can sort of accept the need to keep the write time (thus requiring 50% more space per value), the idea of holding many times more metadata for replication purposes than the actual data we want to replicate seems… silly at best.

Log shipping replication it is, at least until someone can come up with a better way to resolve the issues above.

On replication strategies, or the return of the long article

Meta note: I’ve been doing short series of blog posts for a while. I thought that this would be a good time to change. I am not sure how big this blog post is going to be, but it is going to be big. Please let me know about which approach you find better, and your reasoning.

I have been thinking about this quite a lot in the past few days. I am trying to see if there is a common solution to replication in general that we can utilize across a number of solutions. If we can do that, we can provide much better feature set for a wide variety of scenarios.

But before we can talk about how to actually implement replication, we need to talk about what type of replication we are talking about. We are assuming a single database (non sharded, running on multiple nodes). In general, there appears to be the following options:

  • Master / slaves
  • Primary / secondaries
  • Multi write partners
  • Multi master

Those are just designations that I’ll use for this series of blog posts. For the purpose of those posts, they are very different beast indeed.

The master/ slaves approach is talking specifically for a scenario where you have a single write master and one or more slaves. A key aspect of this strategy is that you can never (at least under normal operations) make any change whatsoever to the slaves. They are pure reads, and they can not be changed to become writeable without severing their ties to the master or risking data corruption.

A common example of such an approach is log shipping. I’ll discuss it in detail later on, but you can look at the docs for any such system, changing a slave to be writable is a decidedly non trivial process. And for a good reason.

The primary / secondaries mode is very similar to the master / slaves approach, however, here we have an explicit option for a secondary to become the primary. There can be only one primary, but the idea is that we allow a much easier way to switch the primary node. MongoDB uses such a system.

Multi write partners systems allow any node to accept a write, and it will take care of distributing the change to the other nodes. It also need, unlike the other options so far, to deal with conflicts. The ability of two users to write to the same value on multiple nodes at the same time. However, multi write partners usually make assumptions about their partners. For example, that they are relatively in sync, and that there is a separate protocol for bringing a new node online into the partnership that is outside the usual replication metric.

Multi master systems allow, accept and encourages nodes to come and go as they please, they assume that writes can and will conflict, and the need to resolve that on an ongoing basis. There are no expectations from the other nodes about being relatively in sync, and it is common to “re-seed” a new node by just starting replication to it, which means that you need to replicate all the data from the beginning of time to it. It is also common to have a node pop up once in a blue moon, expect to get all changes that happened while it was gone, and then drop off again.

Let us look at the actual implementation details of each, including some examples, and hopefully it’ll be clearer what I am talking about.

 

Log Shipping

Master / slaves is usually implemented via log shipping. The easiest way to think about log shipping is that the master database will send (magically, we don’t really care much how at this point) to the slaves instructions on how to directly modify the database files. In other words, conceptually, it is sending them the following:

   1: writep(fd1, 1024, new[]{ 17,85,124,13,86}, 5);
   2: writep(fd1, 18432, new[]{ 12,95,34,83,76,32,59}, 7);b

Those are very low level modifications, as you can imagine. The advantage here is that it is very easy to capture and replay those changes. The disadvantage is that you cannot really do anything else. Because the changes are happening at the very bottom of the stack, there is no chance to run any sort of logic. We are just writing to the file, same as the master server did.

This is the key reason why it is so hard for a slave to allow writes. The moment it makes any independent write, it opens itself up to the risk that the master would also do a write, that would generate data corruption. That is why you have to do the major song & dance if you want to switch the master & the slave. You have to go through all of this trouble to ensure that you don’t ever have a scenario where you have a write happening on both ends.

Once that happens, you can never ever get those two in sync again. It is just happening at too low a level.

Generating a new node, however, is very easy. Make sure to keep the journal around, do a full backup of the database and move it to another node. Then start shipping the logs over. Because they started at the same point, they can be safely applied.

Note that this version is very sensitive to versioning issues. You cannot have even a tiny change in the versions of working with the low level storage, because then all hell might break lose. This method is very good for generating read replicas. Indeed, this is what this is used for most of the time.

In theory, you can even get it to do failovers, because while the master is down, the slave can write. The problem is how do you handle a case where the slave think that the master is down, and the master think that everything is fine. At that point, you might have both of them accept writes, resulting in an unmergable situation.

In theory, since they share a common root, you can decide that one of them is the victor, and go with that, but that would result in losing data from the loser server, and probably data that you have no actual way of getting back. The changes we keep track of here are very small, and likely too granular to allow you to actually do something meaningful to extract the changed information.

Oplog

This is actually quite similar to the log shipping method, but instead of sending the very low level file I/O operations, we’re actually sending higher level commands. This leads to a quite a few benefits as far as we are concerned. The primary server can send its log as:

   1: set("users/1", {"name": "oren" });
   2: set("users/2", {"name": "ayende" });
   3: del("users/1");

Executing this set of instruction on the secondary will result in identical state on the secondary.  Unlike Log Shipping option, this actually require the secondary server to perform work, so it is more expensive than just apply the already computed file updates.

However, the upside of this is that you can have a far more readable log. It is also much easier to turn a secondary into a primary. Mostly, this is silly. The actual operation is the exact same thing. But because you are working at the protocol level, rather than the file level. You can get some interesting benefits.

Let us assume that you have the same split brain issue, when both primary & secondary think that they are the primary. In the Log Shipping case, we had no way to reconcile the differences. In the case of Oplog, we can actually do this.  The key here is that we can:

  • Dump one of the servers rejected operations into a recoverable state.
  • Attempt to apply both severs logs, hoping that they didn’t both work on the same document.

This is the replication mode used by MongoDB. And it has chosen the first approach for handling such conflicts. Indeed, that is pretty much the only choice that it can safely make. Two servers making modifications to the same object is always going to require manual resolution, of course. And it is usually better to have to do this in advance and explicitly rather than “sometimes it works”.

You can see some discussion on how merging back divergent writes works in MongoDB here. In fact, continuing to use the same source, you can see the internal oplog in MongoDB here:

   1: // Operations
   2:  
   3: > use test
   4: switched to db test
   5: > db.foo.insert({x:1})
   6: > db.foo.update({x:1}, {$set : {y:1}})
   7: > db.foo.update({x:2}, {$set : {y:1}}, true)
   8: > db.foo.remove({x:1})
   9:  
  10: // Op log view
  11:  
  12: > use local
  13: switched to db local
  14: > db.oplog.rs.find()
  15: { "ts" : { "t" : 1286821527000, "i" : 1 }, "h" : NumberLong(0), "op" : "n", "ns" : "", "o" : { "msg" : "initiating set" } }
  16: { "ts" : { "t" : 1286821977000, "i" : 1 }, "h" : NumberLong("1722870850266333201"), "op" : "i", "ns" : "test.foo", "o" : { "_id" : ObjectId("4cb35859007cc1f4f9f7f85d"), "x" : 1 } }
  17: { "ts" : { "t" : 1286821984000, "i" : 1 }, "h" : NumberLong("1633487572904743924"), "op" : "u", "ns" : "test.foo", "o2" : { "_id" : ObjectId("4cb35859007cc1f4f9f7f85d") }, "o" : { "$set" : { "y" : 1 } } }
  18: { "ts" : { "t" : 1286821993000, "i" : 1 }, "h" : NumberLong("5491114356580488109"), "op" : "i", "ns" : "test.foo", "o" : { "_id" : ObjectId("4cb3586928ce78a2245fbd57"), "x" : 2, "y" : 1 } }
  19: { "ts" : { "t" : 1286821996000, "i" : 1 }, "h" : NumberLong("243223472855067144"), "op" : "d", "ns" : "test.foo", "b" : true, "o" : { "_id" : ObjectId("4cb35859007cc1f4f9f7f85d") } }

You can actually see the chain on command to oplog entry. The upsert command in line 7 was turned into an insert in line 18, for example. There appears to also be a lot of work done to avoid having to do any sort of computable work, in favor of resolving things to a simple idempotent operation.

For example, if you have a doc that looks like {counter:1} and you do an update like {$inc:{counter:1}} on the primary, you’ll end up with {counter:2} and the oplog will store {$set:{counter:2}}. The secondaries will replicate that instead of the $inc.

That is pretty nice feature, since it mean that you can much apply changes multiple times and end with the same result. But it all leads to the end result, in which you can’t merge divergent writes.

You do get a much better approach for actually going over the data and doing the fixup yourself, but still.. I don’t really like it.

Multi write partners

In this mode, we have a set of servers, each of which is familiar with their partners. All the writes coming are accepted, and logged. Replication happen from the source server contacting all of the destination servers and asking them: What is the last you heard from me? Here are all of my changes since then. Critically, it is at this point that we can trim the log for all of the actions that were already replicated to all of the servers.

A server being down means that the log of changes to go there is going to increase in size until the partner is up again, or we remove the entry for that server from our replication destination.

So far, this is very similar to how you would structure an oplog. The major difference is how you structure the actual data you log. In the oplog scenario, you’re going to write the changes that happens to the system. And the only way to act on this is to actually apply the op log in the same sequence as it was generated. This leads to a system where you can always have just a single primary node. And that leads to situations when split brains will result in data loss or manual merge steps.

In MWP case, we are going to keep enough context (usually full objects) so that we can give the user a better option to resolve the conflict. This also gives us the option of replaying the log in non sequential manner.

Note, however, that you cannot just bring a new server online and expect it to start playing nicely. You have to start from a known state, usually a db backup of an existing node. Like the log shipping scenario, the process is essentially, start replicating (to the currently non existent server), that will ensure that the log will be there when we actually have the new server. Backup the database and restore on a secondary server. Configure to accept replication from the source server.

The complexities here are that you need to deal with operations that you might already have. That is why this is usually paired with vector clocks, so you can automatically resolve such conflicts. When you cannot resolve such conflicts, this falls down to manual user intervention.

Multi Master

Multi master systems are quite similar to multi write partners, but they are designed to operate independently. It is common for servers to be able communicate with one another only rarely. For example, a mobile system that is only able to get connected just a few hours a week. As such, we cannot just cap the size of the operations to replicate. In fact, the common way to bring a new server up to speed is just to replicate to it. That means that we need to be able to replicate, essentially from any point in the server history, to a new server.

That works great, as long as you don’t have deletes. Those do tend to make things harder, because you need to keep track of those, and replicate them. RavenDB and CouchDB are both multi master systems, for example. Conflicts works the same way, pretty much, and we use a vector clock to determine if a value is in conflict or not.

 

Divergent writes

I mentioned this a few times, but I didn’t fully explain. For my purposes, we assume that we are using 2 servers (and yes, I know all about quorums, etc. Not relevant for this discussion) running in master/slave mode.

At some point, the slave think that the master is down and takes over, and the master doesn’t notice this and still think it is the master. During this time, both server accept the following writes:

Server A Server B
write users/1 wrier users/2
write users/3 write users/3
delete users/4 delete users/5
delete users/6 write users/6
write users/7 delete all users
set all users to active write users/8

After those operation happen, we restore communication between the two servers and they need to decide how to resolve those changes

Getting down to business

Okay, that is enough talking about what those terms mean. Let us consider the implications of using them. Log shipping is by far the simplest method to use. Well, assuming that you actually have a log mechanism, but most dbs do. It is strictly one writer model, and there is absolutely no way to either resolve divergent writes or even to find out what they were. The good thing about log shipping is that it is quite easy to get this working without actually needing to care anything about the actual data involved. We work directly at the file level, we don’t care at all about what the data is. The problem is that we can’t even solve simple conflicts, like writes to the different objects. This is because we are actually working at the file level, and all the changes are there. Attempting to merge changes from multiple logs would likely result in file corruption. The up side is that it is probably the most efficient way to go about doing this.

Oplog is a step above log shipping, but not a big one. It doesn’t resolve the divergent writes issues. This is now an application level protocol. The log needs to contain information specific to the actual type of data that we store. And you need to write explicit code to handle this. That is nice, but it also require strict sequence of all operations. Now, you can try to merge things between different logs. However, you need to worry about conflicts, and more to the point, there is usually nothing in the data itself that will help you even detect conflicts.

Multi write partners are meant to take this up a notch. They do keep track of the version history (usually via vector clocks). Here, the situation is more complex, because we need to explicitly decide how to deal with conflicts (either resolve automatically or defer to user decision), but also how to handle distribution of updates. Usually they are paired with some form of logic that tells you how to direct your writes. So all writes for a particular piece of data would go to a preferred node, to avoid generating multiple versions. The data needs to contains some information about that, so we keep vector clock information around. Once we sent the changes to all our partners, we can drop them, saving in space.

Multi master is meant to ensure that you can have partners that might only see one another occasionally, and it makes no assumptions about the topology. It can handle a node that comes on, get some data replicated, and drop off for a while (or forever). Each node is fully independent, and while they would collaborate with others, they don’t need them. The downside here is that we need to keep track of some things forever. In particular, we need to keep track of deletes, to ensure that we can get them to the remote machines.

What about set operations?

Interesting enough, that is probably the hardest issue to resolve. Consider the case when you have the following operations happen:

Server A Server B
write users/7 delete all users
set all users to active write users/8 (active: false)

What should be the result of this? There isn’t a really good answer. Should users/8 be set to active: true? What about users/7, should it be deleted or kept?

It gets hard because you don’t have good choices. The hard part here is actually figuring out that you have a conflict. And there isn’t a really good way to handle set operations nicely with conflicts. The common solution is to translate this to the actual operations made (delete users/1,user/2, users/3 – writer users/8, users/5) and leave it at that. The set based operation is translated to the actual individual operations that actually happened. And on that we can detect conflicts much more easily.

Log shipping is easiest to work with, operationally speaking. You know what you get, and you deal with that. Oplog is also simple, you have a single master, and that works. Multi master and multi write partners requires you to take explicit notice of conflicts, selection of the appropriate node to reduce conflicts, etc.

In practice, at least in the scenarios relating to RavenDB, the ability to take a server offline for weeks or months doesn’t seem to be used that often. The common deployment model is of servers running as steady partners. There are some optimizations that you can do for multi write partners that are hard/impossible to do with multi master.

My current personal preference at this point would like to go with either log shipping or multi write master. I think that either one of them would be fairly simple to implement and support operationally. I think that I’ll discuss actual design for the time series topic using either option in my next posts.

Time series feature design: Querying over large data sets

What happens when you want to do an aggregation query over very large data set? Let us say that you have 1 million data points within the range you want to query, and you want to get a rollup of all the data in the range of a week.

As it turns out, there is a relatively simple solution for optimizing this and maintaining a relatively constant query speed, regardless of the actual data set size.

Time series data has the great benefit of being easily aggregated. Most often, the data looks like this:

image

The catch is that you have a lot of it.

The set of aggregation that you can do over the data is also relatively simple. You have mean, max, min, std deviation, etc.

The time ranges are also pretty fixed, and the nice thing about time series data is that the bigger the range you want to go over, the bigger your rollup window is. In other words, if you want to look at things over a week, you would probably use a day or hour rollup. If you want to see things over a month, you will use a week or a day, over a year, you’ll use a week or a month, etc.

Let us assume that the cost of aggregation is 10,000 operations per second (just some number I pulled because it is round and nice to work with, real number is likely several orders of magnitude higher). So if we have to run this over a set that is 1 million data points in size, with the data being entered on per minute basis. With 1 million data points, we are going to wait almost two minutes for the reply. But there is really no need to actually check all those data points manually.

What we can do is actually prepare, ahead of time, the rollups on an hourly basis. That gives us a summary on a per hour basis of just over 16 thousand data points, and will result in a query that runs in under 2 seconds. If we also do a daily rollup, we move from having a million data points to less than a thousand.

Actually maintaining those computed rollups would probably be a bit complex, but it won’t be any more complex than how we are computing map/reduce indexes in RavenDB (this is a private, and simplified, case of map/reduce). And the result would be instant query times, even on very large data sets.