Raven & Event Sourcing

time to read 8 min | 1454 words

image I keep trying to work on the replication bundle for Raven, but I keep getting distracted with more interesting stuff.

In this case, I kept coming back to several discussions that I had with people who want to use Raven for storing events, and were thinking about how to go from a stream of events to a complete aggregate. I kept thinking that Raven should already be able to handle that. And indeed it can, quite easily, it turns out.

Raven is already capable of running operations over a stream of document to produce a value, to go from there to event stream producing an aggregate is easy. The only problem was that we needed to support external views. That was easy enough to do, so let me show what we have now.

Let us assume that we have the events shown on the right stored in Raven, as you can see, this is a stream of events for a shopping cart. What we want to have is to go from there to an actual shopping cart.

We define the following view:

image

I am showing the class diagram here to show you all the types that are involved here. Note that ShoppingCart has AddToCart and RemoveFromCart method, which has the typical implementation.

Now, let us look at the actual view code:

    [DisplayName("Aggregates/ShoppingCart")]
    public class ShoppingCartEventsToShopingCart : AbstractViewGenerator
    {
        public ShoppingCartEventsToShopingCart()
        {
            MapDefinition = docs => docs.Where(document => document.For == "ShoppingCart");
            GroupByExtraction = source => source.ShoppingCartId;
            ReduceDefinition = Reduce;

            Indexes.Add("Id", FieldIndexing.NotAnalyzed);
            Indexes.Add("Aggregate", FieldIndexing.No);
        }

        private static IEnumerable<object> Reduce(IEnumerable<dynamic> source)
        {
            foreach (var events in source
                .GroupBy(@event => @event.ShoppingCartId))
            {
                var cart = new ShoppingCart { Id = events.Key };
                foreach (var @event in events.OrderBy(x => x.Timestamp))
                {
                    switch ((string)@event.Type)
                    {
                        case "Create":
                            cart.Customer = new ShoppingCartCustomer
                            {
                                Id = @event.CustomerId,
                                Name = @event.CustomerName
                            };
                            break;
                        case "Add":
                            cart.AddToCart(@event.ProductId, @event.ProductName, (decimal)@event.Price);
                            break;
                        case "Remove":
                            cart.RemoveFromCart(@event.ProductId);
                            break;
                    }
                }
                yield return new
                {
                    cart.Id,
                    Aggregate = JObject.FromObject(cart)
                };
            }
        }
}

We are doing several interesting things happening in the constructor:

  • The display name is the name of the index.
  • In the constructor, we define the map part as filtering for events for the shopping cart.
  • We will create a shopping cart per shopping cart id, so we specify the group by extraction method. Raven will use that to optimize updates.
  • Note the indexes definition, we want to id to be stored as as a primary key, and the aggregate data to be stored, not analyzed for searching.

Now, let us talk about the interesting bits, the Reduce function.

That function should be pretty easy to follow, I think. We are getting a stream of events, grouping them by their shopping cart id. Then, for each shopping cart, we sort the events by date, and proceed to build the aggregate.

Finally, we return the data so Raven will store it in the index.

The result, by the way, is this:

image

I think this is cool.

Using this approach, Raven will automatically keep the aggregate definition up to date with the event streams coming on. Furthermore, that aggregate will only be computed when a change happen, so accessing it is very cheap.

Finally, if we have a storm of events on a particular shopping cart, we can choose whatever to wait and see it in its most version, or get a potentially stale view of it really fast.