Ayende @ Rahien

Refunds available at head office

RavenDB Sharding–Map/Reduce in a cluster

In my previous post, I introduced RavenDB Sharding and discussed how we can use sharding in RavenDB. We discussed both blind sharding and data driven sharding. Today I want to introduce another aspect of RavenDB Sharding. The usage of Map/Reduce to  gather information from multiple shards.

We start by defining a map/reduce index. In this case, we want to look at the invoice totals per date. We define the index like this:

public class InvoicesAmountByDate : AbstractIndexCreationTask<Invoice, InvoicesAmountByDate.ReduceResult>
{
    public class ReduceResult
    {
        public decimal Amount { get; set; }
        public DateTime IssuedAt { get; set; }
    }

    public InvoicesAmountByDate()
    {
        Map = invoices =>
              from invoice in invoices
              select new
              {
                  invoice.Amount,
                invoice.IssuedAt
              };

        Reduce = results =>
                 from result in results
                 group result by result.IssuedAt
                 into g
                 select new
                 {
                     Amount = g.Sum(x => x.Amount),
                    IssuedAt = g.Key
                 };
    }
}

And then we execute the following code:

using (var session = documentStore.OpenSession())
{
    var asian = new Company { Name = "Company 1", Region = "Asia" };
    session.Store(asian);
    var middleEastern = new Company { Name = "Company 2", Region = "Middle-East" };
    session.Store(middleEastern);
    var american = new Company { Name = "Company 3", Region = "America" };
    session.Store(american);

    session.Store(new Invoice { CompanyId = american.Id, Amount = 3, IssuedAt = DateTime.Today.AddDays(-1)});
    session.Store(new Invoice { CompanyId = asian.Id, Amount = 5, IssuedAt = DateTime.Today.AddDays(-1) });
    session.Store(new Invoice { CompanyId = middleEastern.Id, Amount = 12, IssuedAt = DateTime.Today });
    session.SaveChanges();
}

We use a three way sharding, based on the region of the company, so we actually have the following document sin three different servers:

First server, Asia:

image

Second server, Middle East:

image

Third server, America:

image

Now, let us see what happen when we use the map/reduce query:

using (var session = documentStore.OpenSession())
{
    var reduceResults = session.Query<InvoicesAmountByDate.ReduceResult, InvoicesAmountByDate>()
        .ToList();

    foreach (var reduceResult in reduceResults)
    {
        string dateStr = reduceResult.IssuedAt.ToString("MMM dd, yyyy", CultureInfo.InvariantCulture);
        Console.WriteLine("{0}: {1}", dateStr, reduceResult.Amount);
    }
    Console.WriteLine();
}

As you can see, again, we make no distinction in our code about using sharding, we just query it normally. The results, however, are quite interesting:

image

As you can see, we got the correct results, cluster wide.

RavenDB was able to query all the servers in the cluster for their results, reduce them again, and get us the total across all three servers.

And that, my friends, it truly awesome.

Tags:

Posted By: Ayende Rahien

Published at

Originally posted at

Comments

Jonty
03/21/2012 11:08 AM by
Jonty

Nice. Presumably paging and sorting are not supported?

Ayende Rahien
03/21/2012 11:11 AM by
Ayende Rahien

Jonty, Paging and sort is supported.

Jonty
03/21/2012 11:14 AM by
Jonty

How does that work then? Presumably you'd have to sort on each server, return the number of results from each server equal to the page size and do a further sort on the client.

Ayende Rahien
03/21/2012 11:19 AM by
Ayende Rahien

Jonty, Yes, that is what we are doing.

Geert Baeyaert
03/21/2012 04:37 PM by
Geert Baeyaert

Ayende, is that also how it works for pages other than the first page?

Let's say we want documents n through m. Are you saying that you get the first m documents from each server, and then on the client sort and throw away the unnecessary documents?

morcs
03/21/2012 04:53 PM by
morcs

@Geert I guess that's the case, except it would be the reduce results that are being returned and thrown away, not whole documents!

Ayende Rahien
03/21/2012 07:08 PM by
Ayende Rahien

Greet, No, it does no. we get the N-M from each server, then sort them, and give you the required page size from there.

Geert Baeyaert
03/21/2012 09:11 PM by
Geert Baeyaert

Ok, I must be missing something.

Let's say we want document 3 through 5, sorted by key. There are 2 servers in the cluster. Server A contains docs with key A, B, C, E, H. Server B contains docs with key D, F, G, I, J

The expected result without clustering is C, D, E.

However, with clustering: Server A : C, E, H Server B : G, I, J which after sorting, and taking the first 3, gives you C, E, G.

Jonty
03/21/2012 09:49 PM by
Jonty

I'm with Geert here. Logically you'd have to bring back m results from each server, as the first record on any given server could be in the desired page. Unless you did some kind of round robin between servers before returning to the client.

Ayende Rahien
03/22/2012 09:56 AM by
Ayende Rahien

Geert, Yes, you are correct. You cannot get a globally paged sharded result without having access to all of the information. Now, we may provide feedback in the future that will allow you to fine tune those per shard, but this is really hard to do, and place undue burden on the client.

More than that, the actual scenario doesn't look too good from a business point of view either. In a sharded env, you respect the sharding, and you don't try to go ahead and do things like this on the fly. You would modify your behavior so your queries respected the shard boundaries.

morcs
03/22/2012 11:07 AM by
morcs

So would the result of (hope the formatting works):

session.Query...() .Skip(10) .Take(10) .ToList()

return unexpected results under sharding? Or would it get 20 results from each server and do the Skip and Take client-side?

morcs
03/22/2012 11:08 AM by
morcs

I guess what I really should do is download Raven DB and try it for myself, since Ayende's put so much effort in to making that as simple as possible to do :)

Ayende Rahien
03/22/2012 11:54 AM by
Ayende Rahien

Morcs, It would get the 10 - 20 results from each server, sort them and give you 10 results

Thomas Krause
03/22/2012 09:14 PM by
Thomas Krause

@morcs and Jonty

Doing sorting and paging at the same time like being described is hard. One solution I could think of is using the same attribute for sharding and sorting.

So in the example above you could sort first by region and then by name. This would get the first pages from server A, the next pages from server B and so on. I'm not sure if this is already supported by RavenDB, but it shouldn't be too complicated. In case you reach the end of one shard and only get a partial page, you would need to do another request for the next shard to fill the page.

morcs
03/23/2012 09:17 AM by
morcs

Understood :) I've not had to use anything like sharding before but it would worry me that:

session.Query(...).OrderBy(...).Skip(x)

would return different, possibly confusing results in a sharded setup, but I do understand why.

Would the "safe by default" principle suggest that we should get an exception when trying to use Skip on an IOrderedQueryable in a sharded setup?

Ayende Rahien
03/23/2012 10:59 AM by
Ayende Rahien

Morcs, We provide an extension point for you to inject your own behavior, so I am not sure if an exception would be a good idea here.

petar
03/31/2012 02:35 AM by
petar

Is there a path that one can take to reverse the sharded DB back into single instance?

Thanks.

Ayende Rahien
04/01/2012 08:34 AM by
Ayende Rahien

Petar, Sure, take all the data from the sharded instances and put it in one box. Then use the standard DocumentStore

Sonic
04/02/2012 05:44 PM by
Sonic

Is there a way to "partition/shard" the data by a dynamic/arbitrary sequential value like date? In other words, could I tell RavenDB to partition a DocumentStore by month or do I have to explicitly define the shard like in your example of region?

Ayende Rahien
04/03/2012 12:05 AM by
Ayende Rahien

Sonic, No, you can do it easily with RavenDB. You need to provide two values to the ShardingOn method, the first is to extract the value from the entity, and the second is to convert the value to the shard id. That way, you can do things like arbitrary or date based.

Vlad
04/06/2012 10:10 PM by
Vlad

Oren, can I retrieve results if even nodes which doesn't contain appropriate data are down (offline)? Shards located in different datacentres. Simple example: 1. I do Load(ID) from ShardedDocumentStore [Shard1, Shard2]. 2. Shard 1 is online. Shard2 is offline (down). 3. Shard 1 contains required document. I have exception in this case. How I can setup a session for retrieving available data? Does Raven have this possibility?

Ayende Rahien
04/08/2012 10:14 AM by
Ayende Rahien

Vlad, Right now, we don't handle this scenario, and that is on purpose. I don't know what the kind of behavior to implement here would be. But we do provide a hook to show how you can handle that yourself, giving you the ability to handle that. The hook is the IShardAccessStrategy.

Vlad
04/08/2012 11:04 PM by
Vlad

Oren, here is an example of the case. One of a system customers wants to store their data in separated db instance, in their own network/datacenter. Database is sharded by Customer. The customer also wants to store user accounts at their site (very paranoidal policy). I want to avoid situation when customer's connection problems will affect the system and other users will not have access to it. It will be great to have sharding which helps to avoid the problem. I've investigated the code of existing Access strategies (Sequential and Parallel) and as I see I can write the same but with appropriate exception handling. Thank you and your team for readable code!

Ayende Rahien
04/09/2012 06:21 AM by
Ayende Rahien

Vlad, Note that one thing that you have to worry about in this scenario is actually knowing that you have a connection problem. The absolute worst thing you can have is to have a connection problem and for some of the data to just go away without you actually noticing.

Vlad
04/09/2012 11:29 AM by
Vlad

Sure things, Oren. A user should be notified in case when the system avoids unhandled exception. Basically I think about only Load(id) operation which returns only single value. In this case we can: 1. return found document (if any exists); 2.return null; 3. return exception when not found any and connection is broken. Yes, you are right - the system must guarantee that user see all requested data or notify about the problem like "You cannot view requested data because...", or in very rare situation - show part of requested data and notify about the problem like "You see not all of requested data because..."

Ayende Rahien
04/09/2012 11:33 AM by
Ayende Rahien

Vlad, I wrote a five parts series about this exact topic: http://ayende.com/blog/155809/api-design-sharding-status-for-failure-scenarios?key=167619a5bdec4055a66651904916ffb4 http://ayende.com/blog/155841/api-design-sharding-status-for-failure-scenariosndash-ignore-and-move-on?key=fe08107267224788b42ce633469418ec http://ayende.com/blog/155873/api-design-sharding-status-for-failure-scenariosndash-explicit-failure-management?key=341dc08c7fde407a83dc2b7ad81c3bd0 http://ayende.com/blog/155905/api-design-sharding-status-for-failure-scenariosndash-explicit-failure-management-doesnrsquo-t-work?key=c9a5a273ba8d4029bd2f111e3a7293c8 http://ayende.com/blog/155937/api-design-sharding-status-for-failure-scenariosndash-solving-at-the-right-granularity?key=f3e596cb1e5342c193aa617553adea5d

I would like your opinion about this.

Comments have been closed on this topic.