Ayende @ Rahien

Hi!
My name is Oren Eini
Founder of Hibernating Rhinos LTD and RavenDB.
You can reach me by phone or email:

ayende@ayende.com

+972 52-548-6969

, @ Q c

Posts: 5,953 | Comments: 44,403

filter by tags archive

Special Offer29% discount for all our products


Well, it is nearly the 29 May, and that means that I have been married for four years.

To celebrate that, I am offering a 29% discount on all our products (RavenDB, NHibernate Profiler, Entity Framework Profiler).

All you have to do is purchase any of our products using the following coupon code:

4th Anniversary

This offer is valid to the end of the month only.

RavenDB ShardingAdding a new shard to an existing cluster, splitting the shard


In my previous post, we have increased the capacity of the cluster by moving all new work to the new set of servers. In this post, I want to deal with a slightly harder problem, how to handle it when it isn’t new data that is causing the issue, but existing data. So we can’t just throw a new server, but need to actually move data between nodes.

We started with the following configuration:

var shards = new Dictionary<string, IDocumentStore>
{
    {"Shared", new DocumentStore {Url ="http://rvn1:8080", DefaultDatabase = "Shared"}},
    {"EU", new DocumentStore {Url = "http://rvn2:8080", DefaultDatabase = "Europe"}},
    {"NA", new DocumentStore {Url = "http://rvn3:8080", DefaultDatabase = "NorthAmerica"}},
};

And what we want is to add another server for EU and NA. Our new topology would be:

var shards = new Dictionary<string, IDocumentStore>
{
    {"Shared", new DocumentStore {Url ="http://rvn1:8080", DefaultDatabase = "Shared"}},
    {"EU1", new DocumentStore {Url = "http://rvn2:8080", DefaultDatabase = "Europe1"}},
    {"NA1", new DocumentStore {Url = "http://rvn3:8080", DefaultDatabase = "NorthAmerica1"}},
    {"EU2", new DocumentStore {Url = "http://rvn4:8080", DefaultDatabase = "Europe2"}},
    {"NA2", new DocumentStore {Url = "http://rvn5:8080", DefaultDatabase = "NorthAmerica2"}},
};

There are a couple of things that we need to pay attention to. First, we no longer use the EU / NA shard keys, they have been removed in favor of EU1 & EU2 / NA1 & NA2. We’ll also change the sharding configuration so it would split the new data between the two new nodes for each region evenly (see previous post for the details on exactly how this is done). But what about the existing data? We need to have some way of actually moving the data. That is when our ops tools come into play.

We use the smuggler to move the data between the servers:

Raven.Smuggler.exe  between http://rvn2:8080 http://rvn2:8080 --database=Europe --database2=Europe1 --transform-file=transform-1.js --incremental
Raven.Smuggler.exe  between http://rvn2:8080 http://rvn4:8080 --database=Europe --database2=Europe2 --transform-file=transform-2.js --incremental
Raven.Smuggler.exe  between http://rvn3:8080 http://rvn3:8080 --database=NorthAmerica --database2=NorthAmerica1 --transform-file=transform-1.js --incremental
Raven.Smuggler.exe  between http://rvn3:8080 http://rvn5:8080 --database=NorthAmerica --database2=NorthAmerica2 --transform-file=transform-2.js --incremental

The commands are pretty similar, with just the different options, so let us try to figure out what is going on. We are asking the smuggler to move the data between two databases in an incremental fashion, while applying a transform script. The transform-1.js file looks like this:

function(doc) { 
    var id = doc['@metadata']['@id']; 
    var node = (parseInt(id.substring(id.lastIndexOf('/')+1)) % 2);

    if(node == 1)
        return null;

    doc["@metadata"]["Raven-Shard-Id"] = doc["@metadata"]["Raven-Shard-Id"] + (node+1);

    return doc;
}

And the tranasform-2.js is exactly the same except that it return early if node is 0. In this way, we are able to split the data into the two new servers.

Note that the reason we use an incremental approach means that we can do this, even if it takes a long while, then the window of time when we switch is very narrow, and require us to only pass the recently changed data.

That still leaves the question of how are we going to deal with old ids. We are still going to have things like “EU/customers/###” in the database, even if those documents are on one of the two new nodes. We handle this, like most low level sharding behaviors, by customizing the sharding strategy. In this case, we modify the PotentialsServersFor(…) method:

public override IList<string> PotentialShardsFor(ShardRequestData requestData)
{
    var potentialShardsFor = base.PotentialShardsFor(requestData);
    if (potentialShardsFor.Contains("EU"))
    {
        potentialShardsFor.Remove("EU");
        potentialShardsFor.Add("EU1");
        potentialShardsFor.Add("EU2");
    }
    if (potentialShardsFor.Contains("NA"))
    {
        potentialShardsFor.Remove("NA");
        potentialShardsFor.Add("NA1");
        potentialShardsFor.Add("NA2");
    }
    return potentialShardsFor;
}

In this case, we are doing a very simple thing, when the default shard resolution strategy detect that we want to go to the old EU node, we’ll tell it to go to both EU1 and EU2. A more comprehensive solution would narrow it down to the exact server, but that depend on how exactly you split the data, and is left as an exercise for the reader.

RavenDB ShardingAdding a new shard to an existing cluster, the easy way


Continuing on the theme of giving a full answer to interesting questions on the mailing list in the blog, we have the following issue. 

We have a sharded cluster, and we want to add a new node to the cluster, how do we do it? I’ll discuss a few ways in which you can handle this scenario. But first, let us lay out the actual scenario.

We’ll use the Customers & Invoices system, and we put the data in the various shard along the following scheme:

Customers Sharded by region
Invoices Sharded by customer
Users Shared database (not sharded)

We can configure this using the following:

var shards = new Dictionary<string, IDocumentStore>
{
    {"Shared", new DocumentStore {Url ="http://rvn1:8080", DefaultDatabase = "Shared"}},
    {"EU", new DocumentStore {Url = "http://rvn2:8080", DefaultDatabase = "Europe"}},
    {"NA", new DocumentStore {Url = "http://rvn3:8080", DefaultDatabase = "NorthAmerica"}},
};
ShardStrategy shardStrategy = new ShardStrategy(shards)
    .ShardingOn<Company>(company =>company.Region, region =>
    {
        switch (region)
        {
            case "USA":
            case "Canada":
                return "NA";
            case "UK":
            case "France":
                return "EU";
            default:
                return "Shared";
        }
    })
    .ShardingOn<Invoice>(invoice => invoice.CompanyId)
    .ShardingOn<User>(user=> "Shared");
 

So far, so good. Now, we have so much work that we can’t just have two servers for customers & invoices, we need more. We change the sharding configuration to include 2 new servers, and we get:

var shards = new Dictionary<string, IDocumentStore>
     {
         {"Shared", new DocumentStore {Url = "http://rvn1:8080", DefaultDatabase = "Shared"}},
         {"EU", new DocumentStore {Url = "http://rvn2:8080", DefaultDatabase = "Europe"}},
         {"NA", new DocumentStore {Url = "http://rvn3:8080", DefaultDatabase = "NorthAmerica"}},
        {"EU2", new DocumentStore {Url = "http://rvn4:8080", DefaultDatabase = "Europe-2"}},
        {"NA2", new DocumentStore {Url = "http://rvn5:8080", DefaultDatabase = "NorthAmerica-2"}},
     };

    var shardStrategy = new ShardStrategy(shards);
    shardStrategy.ShardResolutionStrategy = new NewServerBiasedShardResolutionStrategy(shards.Keys, shardStrategy)
        .ShardingOn<Company>(company => company.Region, region =>
        {
            switch (region)
            {
                case "USA":
                case "Canada":
                    return "NA";
                case "UK":
                case "France":
                    return "EU";
                default:
                    return "Shared";
            }
        })
        .ShardingOn<Invoice>(invoice => invoice.CompanyId)
        .ShardingOn<User>(user => user.Id, userId => "Shared");

Note that we have a new shard resolution strategy, what is that? This is how we control lower level details of the sharding behavior, in this case, we want to control where we’ll write new documents.

public class NewServerBiasedShardResolutionStrategy : DefaultShardResolutionStrategy
{
    public NewServerBiasedShardResolutionStrategy(IEnumerable<string> shardIds, ShardStrategy shardStrategy)
        : base(shardIds, shardStrategy)
    {
    }

    public override string GenerateShardIdFor(object entity, ITransactionalDocumentSession sessionMetadata)
    {
        var generatedShardId = base.GenerateShardIdFor(entity, sessionMetadata);
        if (entity is Company)
        {
            if (DateTime.Today < new DateTime(2015, 8, 1) ||
                DateTime.Now.Ticks % 3 != 0)
            {
                return generatedShardId + "2";
            }
            return generatedShardId;
        }
        return generatedShardId;
    }
}    

What is the logic? If we have a new company, we’ll call the GenerateShardIdFor(entity) method, and for the next 3 months, we’ll create all new companies (and as a result, their invoices) in the new servers. After the 3 months have passed, we’ll still generate the new companies on the new servers at a rate of two thirds on the new servers vs. one third on the old servers.

Note that in this case, we didn’t have to modify any data whatsoever. And over time, the data will balance itself out between the servers. In my next post, we’ll deal with how we actually split an existing shard into multiple shards.

RavenDB ShardingEnabling shards for existing database


A question came up in the mailing list, how do we enable sharding for an existing database. I’ll deal with data migration in this scenario at a later post.

The scenario is that we have a very successful application, and we start to feel the need to move the data to multiple shards. Currently all the data is sitting in the RVN1 server. We want to add RVN2 and RVN3 to the mix. For this post, we’ll assume that we have the notion of Customers and Invoices.

Previously, we access the database using a simple document store:

var documentStore = new DocumentStore
{
	Url = "http://RVN1:8080",
	DefaultDatabase = "Shop"
};

Now, we want to move to a sharded environment, so we want to write it like this. Existing data is going to stay where it is at, and new data will be sharded according to geographical location.

var shards = new Dictionary<string, IDocumentStore>
{
	{"Origin", new DocumentStore {Url = "http://RVN1:8080", DefaultDatabase = "Shop"}},//existing data
	{"ME", new DocumentStore {Url = "http://RVN2:8080", DefaultDatabase = "Shop_ME"}},
	{"US", new DocumentStore {Url = "http://RVN3:8080", DefaultDatabase = "Shop_US"}},
};

var shardStrategy = new ShardStrategy(shards)
	.ShardingOn<Customer>(c => c.Region)
	.ShardingOn<Invoice> (i => i.Customer);

var documentStore = new ShardedDocumentStore(shardStrategy).Initialize();

This wouldn’t actually work. We are going to have to do a bit more. To start with, what happens when we don’t have a 1:1 match between region and shard? That is when the translator become relevant:

.ShardingOn<Customer>(c => c.Region, region =>
{
    switch (region)
    {
        case "Middle East":
            return "ME";
        case "USA":
        case "United States":
        case "US":
            return "US";
        default:
            return "Origin";
    }
})

We basically say that we map several values into a single region. But that isn’t enough. Newly saved documents are going to have the shard prefix, so saving a new customer and invoice in the US shard will show up as:

image

But existing data doesn’t have this (created without sharding).

image

So we need to take some extra effort to let RavenDB know about them. We do this using the following two functions:

 Func<string, string> potentialShardToShardId = val =>
 {
     var start = val.IndexOf('/');
     if (start == -1)
         return val;
     var potentialShardId = val.Substring(0, start);
     if (shards.ContainsKey(potentialShardId))
         return potentialShardId;
     // this is probably an old id, let us use it.
     return "Origin";

 };
 Func<string, string> regionToShardId = region =>
 {
     switch (region)
     {
         case "Middle East":
             return "ME";
         case "USA":
         case "United States":
         case "US":
             return "US";
         default:
             return "Origin";
     }
 };

We can then register our sharding configuration so:

  var shardStrategy = new ShardStrategy(shards)
      .ShardingOn<Customer, string>(c => c.Region, potentialShardToShardId, regionToShardId)
      .ShardingOn<Invoice, string>(x => x.Customer, potentialShardToShardId, regionToShardId); 

That takes care of handling both new and old ids, and let RavenDB understand how to query things in an optimal fashion. For example, a query on all invoices for ‘customers/1’ will only hit the RVN1 server.

However, we aren’t done yet. New customers that don’t belong to the Middle East or USA will still go to the old server, and we don’t want any modification to the id there. We can tell RavenDB how to handle it like so:

var defaultModifyDocumentId = shardStrategy.ModifyDocumentId;
shardStrategy.ModifyDocumentId = (convention, shardId, documentId) =>
{
    if(shardId == "Origin")
        return documentId;

    return defaultModifyDocumentId(convention, shardId, documentId);
};

That is almost the end. There is one final issue that we need to deal with, and that is the old documents, before we used sharding, don’t have the required sharding metadata. We can fix that using a store listener. So we have:

 var documentStore = new ShardedDocumentStore(shardStrategy);
 documentStore.RegisterListener(new AddShardIdToMetadataStoreListener());
 documentStore.Initialize();

Where the listener looks like this:

 public class AddShardIdToMetadataStoreListener : IDocumentStoreListener
 {
     public bool BeforeStore(string key, object entityInstance, RavenJObject metadata, RavenJObject original)
     {
         if (metadata.ContainsKey(Constants.RavenShardId) == false)
         {
             metadata[Constants.RavenShardId] = "Origin";// the default shard id
         }
         return false;
     }

     public void AfterStore(string key, object entityInstance, RavenJObject metadata)
     {
     }
 }

And that is it. I know that there seems to be quite a lot going on in here, but it basically can be broken down to three actions that we take:

  • Modify the existing metadata to add the sharding server id via the listener.
  • Modify the document id convention so documents on the old server won’t have a designation (optional).
  • Modify the sharding configuration so we’ll understand that documents without a shard prefix actually belong to the Origin shard.

And that is pretty much it.

Why RavenDB isn’t written in F#, or the cost of the esoteric choice


In a recent post, a commenter suggested that using F# rather than C# would dramatically reduce the code size (measured in line numbers).

My reply to that was:

F# would also lead to a lot more complexity, reduced participation in the community, harder to find developers and increased costs all around.

And the data to back up this statement:

C# Developers F# Developers

image

image

Nitpicker corner: Now, I realize that this is a sensitive topic, so I’ll note that this isn’t meant to be a scientific observation. It is a data point that amply demonstrate my point. I’m not going to run a full bore study.  And yes, those numbers are about jobs, not people, but I’m assuming that the numbers are at least roughly comparable.

The reply to this was:

You have that option to hire cheaper developers. I think that the cheapest developers usually will actually increase your costs. But if that is your way, then I wish you good luck, and I accept that as an answer. How about "a lot more complexity"?

Now, let me try to explain my thinking. In particular, I would strongly disagree with the “cheapest developers” mentality. That is very far from what I’m trying to achieve. You usually get what you pay for, and trying to save on software development costs when your product is software is pretty much the definition of penny wise and pound foolish.

But let us ignore such crass terms as money and look at availability. There are less than 500 jobs for F# developers (with salary ranges implications that there isn’t a whole lot of F# developers queuing up for those jobs). There are tens of thousands of jobs for C# developers, and again, the salary range suggest that there isn’t a dearth of qualified candidates that would cause demand to raise the costs. From those numbers, and my own experience, I can say the following.

There are a lot more C# developers than there are F# developers. I know that this is a stunning conclusion, likely to shatter the worldview of certain people. But I think that you would find it hard to refute that. Now, let us try to build on this conclusion.

First, there was the original point, that F# lead to reduced number of lines. I’m not going to argue that, mostly  because software development isn’t an issue of who can type the most. The primary costs for development is design, test, debugging, production proofing, etc. The act of actually typing is pretty unimportant.

For fun, I checked out the line count numbers for similar projects (RavenDB & CouchDB). The count of lines in the Raven.Database project is roughly 100K. The count of lines in CouchDB src folder is roughly 45K. CouchDB is written in Erlang, which is another functional language, so we are at least not comparing apples to camels here. We’ll ignore things like different feature set, different platforms, and the different languages for now. And just say that an F# program can deliver with 25% lines of code of a comparable C# program.

Note that I’m not actually agreeing with this statement, I’m just using this as a basis for the rest of this post. And to (try to) forestall nitpickers. It is easy to show great differences in development time and line of code in specific cases where F# is ideally suited to the task. But we are talking about general purpose usage here.

Now, for the sake of argument, we’ll even assume that the cost of F# development is 50% of the cost of C# development. That is, that the reduction in line count actually has a real effect on the time and efficiency. In other words, if an F# program is 25% smaller than a similar C# program, we’ll not assume that it takes 4 times as much time to write.

Where does this leave us? It leave us with a potential pool of people to hire that is vanishingly small. What are the implications of writing software in a language that have fewer people familiar with it?

Well, it is harder to find people to hire. That is true not only for people that your hire “as is”. Let us assume that you’re going to give those people additional training after hiring them, so they would know F# and can work on your product. An already steep learning curve has just became that much steeper. Not only that, but this additional training means that the people you hire are more expensive (there is an additional period in which they are only learning). In addition to all of that, it will be harder to hire people, not just because you can’t find people already experienced with F#, but because people don’t want to work for you.

Most developers at least try to pay attention to the market, and they make a very simple calculation. If I spend the next 2 – 5 years working in F#, what kind of hirability am I going to have in the end? Am I going to be one of those trying to get the < 500 F# jobs, or am I going to be in the position to find a job among the tens of thousands of C# jobs?

Now, let us consider another aspect of this. The community around a project. I actually have a pretty hard time finding any significant F# OSS projects. But leaving that aside, looking at the number of contributors, and the ability of users to go into your codebase and look for themselves is a major advantage. We have had users skip the bug report entirely and just send us a Pull Request for an issue they run into, others have contributed (significantly) to the project. That is possible only because there is a wide appeal. If the language is not well known, the number of people that are going to spend the time and do something with it is going to be dramatically lower.

Finally, there is the complexity angle. Consider any major effort required. Recently, we are working on porting RavenDB to Linux. Now, F# work on Linux, but anyone that we would go to in order to help us port RavenDB to Linux would have this additional (rare) requirement, need to understand F# as well as Linux & Mono. Any problem that we would run into would have to first be simplified to a C# sample so it could be readily understood by people who aren’t familiar with F#, etc.

To go back to the beginning, using F# might have reduce the lines of code counter, but it wouldn’t reduce the time to actually build the software and it would limit the number of people that can participate in the project, either as employees or Open Source contributors.

RavenDB 3.5 Features: Data Exploration


RavenDB is doing a pretty great job for being a production database, in fact, we have designed it upfront to only have features that make sense to have for robust production systems.

In particular, we don’t have any form of ad-hoc queries. A query always hits an index, so it is very fast. Even what we call dynamic queries in RavenDB are actually creating an index behind the scene. This is pretty awesome for normal production usage, but it does have some limitations when you want to explore the data. This can be because you are a developer trying to find a particular something, and you just want to quickly fire off random queries. You don’t care about the costs, and you don’t want to generate indexes. Or you can be an admin that needs to get a particular report from the system and you want to play around with the details until you get everything right.

In order to serve those needs, RavenDB 3.5 is going to have a really nice feature, explicit data exploration.

For example, let us say that I want to count the number of unique words in all of my posts, I can do it using the following:

image

Note that the actual query is pretty meaningless, and I’m writing this at 1AM with a baby nearby that make funny noises, so the Linq statement there works, but can probably be better.

The point here is that to demo what is going on. We write a simple Linq statement, and can run it against our database, and then gather the results back. It is like having LinqPad directly inside the RavenDB studio. In fact, that is the number one scenario that we envision for this feature, replacing LinqPad usage by having a native capability.

Now, some caveats. As you can see, you can select to limit the query duration as well as the number of documents it will operate on. That give us a quick way to explore the data without putting too much load on the server. You can even take the output here and throw it directly to Excel. “Sam, can you give the a breakdown of orders this year by month and country? Just email me the Excel spreadsheet”.

Note that this is intended as a user feature, it isn’t something that we provide an API for. It is there for admins or developers that are figuring things out, an admin feature, not something that you want to use on production.

Work stealing in the presence of startup / shutdown costs


I mentioned that we have created our own thread pool implementation in RavenDB to handle our specific needs. A common scenario that ended up quite costly for us was the notion of parallelizing similar work.

For example, I have 15,000 documents to index .That means that we need to go over each of the documents and apply the indexing function. That is an embarrassingly parallel task. So that is quite easy. One easy way to do that would be to do something like this:

foreach(var doc in docsToIndex)
	ThreadPool.QueueUserWorkItem(()=> IndexFunc(new[]{doc}));

Of course, that generates 15,000 entries for the thread pool, but that is fine.

Except that there is an issue here, we need to do stuff to the result of the indexing. Namely, write them to the index. That means that even though we can parallelize the work, we still have non trivial amount of startup & shutdown costs. Just running the code like this would actually be much slower than running it in single threaded mode.

So, let us try a slightly better method:

foreach(var partition in docsToIndex.Partition(docsToIndex.Length / Environment.ProcessorCount))
	ThreadPool.QueueUserWorkItem(()=> IndexFunc(partition));

If my machine has 8 cores, then this will queue 8 tasks to the thread pool, each indexing just under 2,000 documents. Which is pretty much what we have been doing until now.

Except that this means that we have to incur the startup/shutdown costs a minimum of 8 times.

A better way is here:

ConcurrentQueue<ArraySegment<JsonDocument>> partitions = docsToIndex.Partition(docsToIndex.Length / Environment.ProcessorCount);
for(var i = 0; i < Environment.ProcessorCount; i++) 
{
	ThreadPool.QueueUserWorkItem(()=> {
		ArraySegment<JsonDocument> first;
		if(partitions.TryTake(out first) == false)
			return;

		IndexFunc(Pull(first, partitions));
	});
}

IEnumerable<JsonDocument> Pull(ArraySegment<JsonDocument> first, ConcurrentQueue<ArraySegment<JsonDocument>> partitions )
{
	while(true)
	{
		for(var i = 0; i < first.Count; i++)
			yield return first.Array[i+first.Start];

		if(partitions.TryTake(out first) == false)
			break;
	}
}

Now something interesting is going to happen, we are scheduling 8 tasks, as before, but instead of allocating 8 static partitions, we are saying that when you start running, you’ll get a partition of the data, which you’ll go ahead and process. When you are done with that, you’ll try to get a new partition, in the same context. So you don’t have to worry about new startup/shutdown costs.

Even more interesting, it is quite possible (and common) for those tasks to be done with by the time we end up executing some of them. (All the index is already done but we still have a task for it that didn’t get a chance to run.) In that case we exit early, and incur no costs.

The fun thing about this method is what happens under the load when you have multiple indexes running. In that case, we’ll be running this for each of the indexes. It is quite likely that each core will be running a single index. Some indexes are going to be faster than the others, and complete first, consuming all the documents that they were told to do. That means that the tasks belonging to those indexes will exit early, freeing those cores to run the code relevant for the slower indexes, which hasn’t completed yet.

This gives us dynamic resource allocation. The more costly indexes get to run on more cores, while we don’t have to pay the startup / shutdown costs for the fast indexes.

FUTURE POSTS

No future posts left, oh my!

RECENT SERIES

  1. The RavenDB Comic Strip (3):
    28 May 2015 - Part III – High availability & sleeping soundly
  2. Special Offer (2):
    27 May 2015 - 29% discount for all our products
  3. RavenDB Sharding (3):
    22 May 2015 - Adding a new shard to an existing cluster, splitting the shard
  4. Challenge (45):
    28 Apr 2015 - What is the meaning of this change?
  5. Interview question (2):
    30 Mar 2015 - fix the index
View all series

Syndication

Main feed Feed Stats
Comments feed   Comments Feed Stats