Ayende @ Rahien

It's a girl

On replication strategies, or the return of the long article

Meta note: I’ve been doing short series of blog posts for a while. I thought that this would be a good time to change. I am not sure how big this blog post is going to be, but it is going to be big. Please let me know about which approach you find better, and your reasoning.

I have been thinking about this quite a lot in the past few days. I am trying to see if there is a common solution to replication in general that we can utilize across a number of solutions. If we can do that, we can provide much better feature set for a wide variety of scenarios.

But before we can talk about how to actually implement replication, we need to talk about what type of replication we are talking about. We are assuming a single database (non sharded, running on multiple nodes). In general, there appears to be the following options:

  • Master / slaves
  • Primary / secondaries
  • Multi write partners
  • Multi master

Those are just designations that I’ll use for this series of blog posts. For the purpose of those posts, they are very different beast indeed.

The master/ slaves approach is talking specifically for a scenario where you have a single write master and one or more slaves. A key aspect of this strategy is that you can never (at least under normal operations) make any change whatsoever to the slaves. They are pure reads, and they can not be changed to become writeable without severing their ties to the master or risking data corruption.

A common example of such an approach is log shipping. I’ll discuss it in detail later on, but you can look at the docs for any such system, changing a slave to be writable is a decidedly non trivial process. And for a good reason.

The primary / secondaries mode is very similar to the master / slaves approach, however, here we have an explicit option for a secondary to become the primary. There can be only one primary, but the idea is that we allow a much easier way to switch the primary node. MongoDB uses such a system.

Multi write partners systems allow any node to accept a write, and it will take care of distributing the change to the other nodes. It also need, unlike the other options so far, to deal with conflicts. The ability of two users to write to the same value on multiple nodes at the same time. However, multi write partners usually make assumptions about their partners. For example, that they are relatively in sync, and that there is a separate protocol for bringing a new node online into the partnership that is outside the usual replication metric.

Multi master systems allow, accept and encourages nodes to come and go as they please, they assume that writes can and will conflict, and the need to resolve that on an ongoing basis. There are no expectations from the other nodes about being relatively in sync, and it is common to “re-seed” a new node by just starting replication to it, which means that you need to replicate all the data from the beginning of time to it. It is also common to have a node pop up once in a blue moon, expect to get all changes that happened while it was gone, and then drop off again.

Let us look at the actual implementation details of each, including some examples, and hopefully it’ll be clearer what I am talking about.

 

Log Shipping

Master / slaves is usually implemented via log shipping. The easiest way to think about log shipping is that the master database will send (magically, we don’t really care much how at this point) to the slaves instructions on how to directly modify the database files. In other words, conceptually, it is sending them the following:

   1: writep(fd1, 1024, new[]{ 17,85,124,13,86}, 5);
   2: writep(fd1, 18432, new[]{ 12,95,34,83,76,32,59}, 7);b

Those are very low level modifications, as you can imagine. The advantage here is that it is very easy to capture and replay those changes. The disadvantage is that you cannot really do anything else. Because the changes are happening at the very bottom of the stack, there is no chance to run any sort of logic. We are just writing to the file, same as the master server did.

This is the key reason why it is so hard for a slave to allow writes. The moment it makes any independent write, it opens itself up to the risk that the master would also do a write, that would generate data corruption. That is why you have to do the major song & dance if you want to switch the master & the slave. You have to go through all of this trouble to ensure that you don’t ever have a scenario where you have a write happening on both ends.

Once that happens, you can never ever get those two in sync again. It is just happening at too low a level.

Generating a new node, however, is very easy. Make sure to keep the journal around, do a full backup of the database and move it to another node. Then start shipping the logs over. Because they started at the same point, they can be safely applied.

Note that this version is very sensitive to versioning issues. You cannot have even a tiny change in the versions of working with the low level storage, because then all hell might break lose. This method is very good for generating read replicas. Indeed, this is what this is used for most of the time.

In theory, you can even get it to do failovers, because while the master is down, the slave can write. The problem is how do you handle a case where the slave think that the master is down, and the master think that everything is fine. At that point, you might have both of them accept writes, resulting in an unmergable situation.

In theory, since they share a common root, you can decide that one of them is the victor, and go with that, but that would result in losing data from the loser server, and probably data that you have no actual way of getting back. The changes we keep track of here are very small, and likely too granular to allow you to actually do something meaningful to extract the changed information.

Oplog

This is actually quite similar to the log shipping method, but instead of sending the very low level file I/O operations, we’re actually sending higher level commands. This leads to a quite a few benefits as far as we are concerned. The primary server can send its log as:

   1: set("users/1", {"name": "oren" });
   2: set("users/2", {"name": "ayende" });
   3: del("users/1");

Executing this set of instruction on the secondary will result in identical state on the secondary.  Unlike Log Shipping option, this actually require the secondary server to perform work, so it is more expensive than just apply the already computed file updates.

However, the upside of this is that you can have a far more readable log. It is also much easier to turn a secondary into a primary. Mostly, this is silly. The actual operation is the exact same thing. But because you are working at the protocol level, rather than the file level. You can get some interesting benefits.

Let us assume that you have the same split brain issue, when both primary & secondary think that they are the primary. In the Log Shipping case, we had no way to reconcile the differences. In the case of Oplog, we can actually do this.  The key here is that we can:

  • Dump one of the servers rejected operations into a recoverable state.
  • Attempt to apply both severs logs, hoping that they didn’t both work on the same document.

This is the replication mode used by MongoDB. And it has chosen the first approach for handling such conflicts. Indeed, that is pretty much the only choice that it can safely make. Two servers making modifications to the same object is always going to require manual resolution, of course. And it is usually better to have to do this in advance and explicitly rather than “sometimes it works”.

You can see some discussion on how merging back divergent writes works in MongoDB here. In fact, continuing to use the same source, you can see the internal oplog in MongoDB here:

   1: // Operations
   2:  
   3: > use test
   4: switched to db test
   5: > db.foo.insert({x:1})
   6: > db.foo.update({x:1}, {$set : {y:1}})
   7: > db.foo.update({x:2}, {$set : {y:1}}, true)
   8: > db.foo.remove({x:1})
   9:  
  10: // Op log view
  11:  
  12: > use local
  13: switched to db local
  14: > db.oplog.rs.find()
  15: { "ts" : { "t" : 1286821527000, "i" : 1 }, "h" : NumberLong(0), "op" : "n", "ns" : "", "o" : { "msg" : "initiating set" } }
  16: { "ts" : { "t" : 1286821977000, "i" : 1 }, "h" : NumberLong("1722870850266333201"), "op" : "i", "ns" : "test.foo", "o" : { "_id" : ObjectId("4cb35859007cc1f4f9f7f85d"), "x" : 1 } }
  17: { "ts" : { "t" : 1286821984000, "i" : 1 }, "h" : NumberLong("1633487572904743924"), "op" : "u", "ns" : "test.foo", "o2" : { "_id" : ObjectId("4cb35859007cc1f4f9f7f85d") }, "o" : { "$set" : { "y" : 1 } } }
  18: { "ts" : { "t" : 1286821993000, "i" : 1 }, "h" : NumberLong("5491114356580488109"), "op" : "i", "ns" : "test.foo", "o" : { "_id" : ObjectId("4cb3586928ce78a2245fbd57"), "x" : 2, "y" : 1 } }
  19: { "ts" : { "t" : 1286821996000, "i" : 1 }, "h" : NumberLong("243223472855067144"), "op" : "d", "ns" : "test.foo", "b" : true, "o" : { "_id" : ObjectId("4cb35859007cc1f4f9f7f85d") } }

You can actually see the chain on command to oplog entry. The upsert command in line 7 was turned into an insert in line 18, for example. There appears to also be a lot of work done to avoid having to do any sort of computable work, in favor of resolving things to a simple idempotent operation.

For example, if you have a doc that looks like {counter:1} and you do an update like {$inc:{counter:1}} on the primary, you’ll end up with {counter:2} and the oplog will store {$set:{counter:2}}. The secondaries will replicate that instead of the $inc.

That is pretty nice feature, since it mean that you can much apply changes multiple times and end with the same result. But it all leads to the end result, in which you can’t merge divergent writes.

You do get a much better approach for actually going over the data and doing the fixup yourself, but still.. I don’t really like it.

Multi write partners

In this mode, we have a set of servers, each of which is familiar with their partners. All the writes coming are accepted, and logged. Replication happen from the source server contacting all of the destination servers and asking them: What is the last you heard from me? Here are all of my changes since then. Critically, it is at this point that we can trim the log for all of the actions that were already replicated to all of the servers.

A server being down means that the log of changes to go there is going to increase in size until the partner is up again, or we remove the entry for that server from our replication destination.

So far, this is very similar to how you would structure an oplog. The major difference is how you structure the actual data you log. In the oplog scenario, you’re going to write the changes that happens to the system. And the only way to act on this is to actually apply the op log in the same sequence as it was generated. This leads to a system where you can always have just a single primary node. And that leads to situations when split brains will result in data loss or manual merge steps.

In MWP case, we are going to keep enough context (usually full objects) so that we can give the user a better option to resolve the conflict. This also gives us the option of replaying the log in non sequential manner.

Note, however, that you cannot just bring a new server online and expect it to start playing nicely. You have to start from a known state, usually a db backup of an existing node. Like the log shipping scenario, the process is essentially, start replicating (to the currently non existent server), that will ensure that the log will be there when we actually have the new server. Backup the database and restore on a secondary server. Configure to accept replication from the source server.

The complexities here are that you need to deal with operations that you might already have. That is why this is usually paired with vector clocks, so you can automatically resolve such conflicts. When you cannot resolve such conflicts, this falls down to manual user intervention.

Multi Master

Multi master systems are quite similar to multi write partners, but they are designed to operate independently. It is common for servers to be able communicate with one another only rarely. For example, a mobile system that is only able to get connected just a few hours a week. As such, we cannot just cap the size of the operations to replicate. In fact, the common way to bring a new server up to speed is just to replicate to it. That means that we need to be able to replicate, essentially from any point in the server history, to a new server.

That works great, as long as you don’t have deletes. Those do tend to make things harder, because you need to keep track of those, and replicate them. RavenDB and CouchDB are both multi master systems, for example. Conflicts works the same way, pretty much, and we use a vector clock to determine if a value is in conflict or not.

 

Divergent writes

I mentioned this a few times, but I didn’t fully explain. For my purposes, we assume that we are using 2 servers (and yes, I know all about quorums, etc. Not relevant for this discussion) running in master/slave mode.

At some point, the slave think that the master is down and takes over, and the master doesn’t notice this and still think it is the master. During this time, both server accept the following writes:

Server A Server B
write users/1 wrier users/2
write users/3 write users/3
delete users/4 delete users/5
delete users/6 write users/6
write users/7 delete all users
set all users to active write users/8

After those operation happen, we restore communication between the two servers and they need to decide how to resolve those changes

Getting down to business

Okay, that is enough talking about what those terms mean. Let us consider the implications of using them. Log shipping is by far the simplest method to use. Well, assuming that you actually have a log mechanism, but most dbs do. It is strictly one writer model, and there is absolutely no way to either resolve divergent writes or even to find out what they were. The good thing about log shipping is that it is quite easy to get this working without actually needing to care anything about the actual data involved. We work directly at the file level, we don’t care at all about what the data is. The problem is that we can’t even solve simple conflicts, like writes to the different objects. This is because we are actually working at the file level, and all the changes are there. Attempting to merge changes from multiple logs would likely result in file corruption. The up side is that it is probably the most efficient way to go about doing this.

Oplog is a step above log shipping, but not a big one. It doesn’t resolve the divergent writes issues. This is now an application level protocol. The log needs to contain information specific to the actual type of data that we store. And you need to write explicit code to handle this. That is nice, but it also require strict sequence of all operations. Now, you can try to merge things between different logs. However, you need to worry about conflicts, and more to the point, there is usually nothing in the data itself that will help you even detect conflicts.

Multi write partners are meant to take this up a notch. They do keep track of the version history (usually via vector clocks). Here, the situation is more complex, because we need to explicitly decide how to deal with conflicts (either resolve automatically or defer to user decision), but also how to handle distribution of updates. Usually they are paired with some form of logic that tells you how to direct your writes. So all writes for a particular piece of data would go to a preferred node, to avoid generating multiple versions. The data needs to contains some information about that, so we keep vector clock information around. Once we sent the changes to all our partners, we can drop them, saving in space.

Multi master is meant to ensure that you can have partners that might only see one another occasionally, and it makes no assumptions about the topology. It can handle a node that comes on, get some data replicated, and drop off for a while (or forever). Each node is fully independent, and while they would collaborate with others, they don’t need them. The downside here is that we need to keep track of some things forever. In particular, we need to keep track of deletes, to ensure that we can get them to the remote machines.

What about set operations?

Interesting enough, that is probably the hardest issue to resolve. Consider the case when you have the following operations happen:

Server A Server B
write users/7 delete all users
set all users to active write users/8 (active: false)

What should be the result of this? There isn’t a really good answer. Should users/8 be set to active: true? What about users/7, should it be deleted or kept?

It gets hard because you don’t have good choices. The hard part here is actually figuring out that you have a conflict. And there isn’t a really good way to handle set operations nicely with conflicts. The common solution is to translate this to the actual operations made (delete users/1,user/2, users/3 – writer users/8, users/5) and leave it at that. The set based operation is translated to the actual individual operations that actually happened. And on that we can detect conflicts much more easily.

Log shipping is easiest to work with, operationally speaking. You know what you get, and you deal with that. Oplog is also simple, you have a single master, and that works. Multi master and multi write partners requires you to take explicit notice of conflicts, selection of the appropriate node to reduce conflicts, etc.

In practice, at least in the scenarios relating to RavenDB, the ability to take a server offline for weeks or months doesn’t seem to be used that often. The common deployment model is of servers running as steady partners. There are some optimizations that you can do for multi write partners that are hard/impossible to do with multi master.

My current personal preference at this point would like to go with either log shipping or multi write master. I think that either one of them would be fairly simple to implement and support operationally. I think that I’ll discuss actual design for the time series topic using either option in my next posts.

Comments

Jahmai Lay
02/24/2014 10:38 AM by
Jahmai Lay

We have primary/secondary master/master setup with Raven at the moment.

It's really great being able to take primary offline to do an upgrade and have the system continue to function fully, including writes, to the secondary, and mostly gives us "zero" downtime.

Federico Lois
02/24/2014 11:20 AM by
Federico Lois

Multi-write partners is a sensible trade-off IMHO.

tobi
02/24/2014 03:35 PM by
tobi

There's another alternative: Allow writes to all nodes. On commit, you validate the read set cluster-wide and abort the transaction if the read set is not identical on all nodes (i.e. a serializability conflict).

This is an optimistic concurrency model similar to what SQL Server gets with Hekaton in 2014. It is serializable (or weaker, if you want to).

For every commit (or group-commit), there is a O(N) operation for N nodes. That's a weakness.

But: No conflicts and serializable transactions.

Another alternative: Make sharding really easy and automatic. That way you can have the primary/secondary model and still scale writes perfectly.

Ayende Rahien
02/24/2014 03:41 PM by
Ayende Rahien

Tobi, What is read set? What happens when a node is down? Are you blocking writes? That means a severe limit on cluster size and HA.

Serializable transactions are bad when you are using a single node. They are a common cause for real performance issues. Much less on a distributed system.

Also, sharding doesn't really handle HA scenario.

tobi
02/24/2014 04:00 PM by
tobi

Ayende, Serializable transactions are a problem when read locks are taken. Serializability can be implemented without read or write locks (Hekaton does that - Hekaton does not have locks at all). On commit you check that everything you ever read (the read set) did not change in the meantime. If that is the case, your transaction is serializable and can commit.

Disadvantage is that transactions can (rarely) fail and must retry.

There's a paper called "SQL Server In-Memory OLTP Internals Overview for CTP1".

Still, your points are valid. This is a very complex design space.

Ayende Rahien
02/24/2014 04:05 PM by
Ayende Rahien

tobi, Assume two transactions: tx1 read row 1 & 2 from server A Modify row 2. Retry on error

tx2 read row 1 & 2 from server B Modify row 1. Retry on error

Using your semantics, you now have an infinite loop. And I don't think that this would be rare.

samy
02/25/2014 10:52 AM by
samy

not commenting on the content (it's flying ten miles over my head right now - too tired) but i wanted to pitch in favor of longer blog posts. Shorter posts on this blog don't usually mean bite-sized content and often open up complex subject and ideas that Ayende handwaves away like its really obvious to anyone. I prefer posts that are longer because it forces the writer to chip at a subject a bit more and gives me more chances to have the ah ha! moment.

that's all, you can resume your stratospheric talking :)

Carsten Hansen
02/25/2014 01:06 PM by
Carsten Hansen

Why not add serverid to the primary key?

Afterwards do a check for dublicates. In case of dublicates calculate avg or resolve by other means.

I remember good old Lotus Notes where you had to merge documents manually :-)

Ayende Rahien
02/25/2014 08:21 PM by
Ayende Rahien

Carsten, Because the value isn't local to a server.

Assume we write this to server 1: foo, 2012-02-01, 5

And this to server 2: foo, 2012-02-01, 15

What happens then?

We actually need this to generate a conflict.

Comments have been closed on this topic.