Ayende @ Rahien

It's a girl

Raven Streams: aggregations–how the user sees them

The major reason for streams is the idea that you don’t really care about each individual item on its own. What you care about a lot more is some sort of aggregation over those values. And sure, you do want to be able to access the values, but you generally don’t.

Let us say that you are a phone company, and you want to use Raven Streams to record all the events that happened, so you can bill on them. Let us imagine that we are interested in just SMS for the moment, so we append each sms to the stream.

Then we are going to write something like:

   1: from msg in messages
   2: select new
   3: {
   4:     Customer = msg.From,
   5:     Count = 1
   6: }
   7:  
   8: from result in results
   9: group result by result.Customer
  10: into g
  11: select new
  12: {
  13:     Customer = g.Key,
  14:     Count = g.Sum(x=>x.Count)
  15: }

If you ever did RavenDB map/reduce indexes, this should be very familiar to you. However, unlike RavenDB, here we don’t need to handle any pesky updates or deletes. That means that the implementation is much simpler, but I’ll discuss that on my next post.

In the meantime, let us consider what is the result of this would be. It would generate a result, which we would persist and allow you to lookup. One can imagine that you can do this via the customer id, and get the sum total as it is right now.

But you’ll probably want to do additional operations, so we need to consider this as well.

For that matter, imagine the scenario where we want to get the data about SMS, MMS, phone calls, etc. How would you expect that to look like?

Tags:

Posted By: Ayende Rahien

Published at

Originally posted at

Comments

Moti
06/04/2013 09:21 AM by
Moti

"However, unlike RavenDB"

I think you meant, "However, unlike map/reduce"

Damian Hickey
06/04/2013 12:05 PM by
Damian Hickey

I'm assuming the streams here are homogenous, that is, a stream per message type. So a separate stream for each of SMS, MMS, Phone Calls, Data Connections (as each of these have different attributes, you'll prob want different aggregations across them), per billing period, per customer.

Then it would be nice to project a new stream from these multiple streams, transforming the source message to the target stream message type. For example, generating the itemized bill 'stream' (attributes: description, amount), per billing period, per customer. This could be used to figure out if the customer is still under their credit limit.

(Separately, I'm interested in looking at the underlying storage for DDD\ES type of apps where the event streams are heterogeneous.)

Khalid Abuhakmeh
06/04/2013 12:20 PM by
Khalid Abuhakmeh

Like I commented on in a previous post, snapshotting is going to be necessary.

The example you gave is incomplete, because as the phone company I need to bill based on a period of time. I would go out of business if I had to wait until the person stopped using my service to bill them.

I would need the ability to either set up a snapshot period, or create the map/reduce to be grouped by both the Month and the CustomerId.

For you second question "For that matter, imagine the scenario where we want to get the data about SMS, MMS, phone calls, etc. How would you expect that to look?"

It would be cool if you could pass back the resulting object / it's auto generated Id and get the collection that resulted in that outcome back.

/results/1/collection -> All documents back

That way you can loop through each result and see what constituted it. This would be cool, but you would have to track it some how.

Karhgath
06/04/2013 01:43 PM by
Karhgath

Unlike Damian, I believe a stream should be heterogeneous. Let's say you have a Telco Stream, with SMS, MMS and Phone events.

Each item you post have a type that could, by convention, be the class name (SMSEvent, MMSEvent, PhoneEvent). The type should be in the metadata (if you keep any). This means you could do a multi map of each type:

from msg in messages.SMSEvents select new { Customer = msg.From, SMSCount = 1, MMSCount = 0, PhoneCount = 0 }

Or simply an index per type if you want to split them (and create "substreams").

Also as Khalid said, we'd need snapshot.Since we're always forward moving, we should only be able to create a snapshot from the last snapshot (or start of stream if no snapshot) to the most recent item. They are on a per index basis:

stream.Snapshot(); stream.SnapshotAll();

This could speed up Map/Reduce and start the aggregation with the last snapshot and move forward, if possible. If you do some date stuff and all in the map/reduce query, it would try to use a snapshot if possible, or rebuild from scratch (which would be slower)

If no events are indexed, or a minimum of new events isn't triggered (configuration, like "minimum 10 items per snapshot"), the snapshot isn't created. It should be mostly behind the scene stuff and never be directly accessed/managed. You'd need to figure out where to store them (in RavenDB?), how to reference them internally, etc.

You'd need conventions for automated snapshots (disabled, every 100 items, every hour/day/week/month, every Type, dynamic per index...). That could be triggered before each append. If we do allow manual triggering of snapshots, we'd need to have some stats like "Item Count Since Last Snapshot" and stuff like that.

We'd have a issue of append date vs item date however (in the case above, we could append on a monday but the phone call happened on sunday), which is non-trivial to solve on a forward only stream. We'd need to assume date related stuff is always the server append date or else we'd have ordering issues.

Unlike Khalid, and because of that last part, Snapshot are performance only and should never reflect business logic.

Now to handle Khalid's issues, we'd need a strategy for this. A Stream per month maybe? This means we could append an item to a specific month even after the month is over, and handle business logic on the software side (detect cutoff and all) and not in Raven Streams. For DDD AggregateRoot, you'd have a stream by AR.

Like Ids and collection names in RavenDB, we could have a convention for this:

store.Conventions.StreamIdFor = (item) => "events/" + item.EventDate.Format("yyyyMM");

store.Conventions.StreamIdFor = (item) => "aggregates/" + item.AggregateId;

Damian Hickey
06/04/2013 04:20 PM by
Damian Hickey

@Karhgath Hay, I didn't say what I believed, I only stated an assumption based on the code shown - that a stream contains messages of one type. :)

Karhgath
06/04/2013 05:54 PM by
Karhgath

@Damian No offense meant, just bad phrasing on my part ;)

Ayende Rahien
06/06/2013 12:09 PM by
Ayende Rahien

Moti, No, I meant what I said. Map/Reduce generally don't deal with updates. RavenDB has updatable map/reduce, but it is pretty rare.

Ayende Rahien
06/06/2013 12:10 PM by
Ayende Rahien

Khalid, I am not sure what you mean when you say, snapshoting. If you are talking about being able to look at the aggregation value from previous time. I guess we can provide that. We are going to keep all of that information around, we aren't going to just keep the aggregation.

Khalid Abuhakmeh
06/06/2013 12:26 PM by
Khalid Abuhakmeh

I guess what I mean by snapshoting, is the ability to take a specified range, most likely by delineated by date or time, and either do two things with the data that falls within that range.

  • Save the aggregation into a collection automatically. This would be helpful for things like Account Usage scenarios that you might bill on a monthly basis. The cool thing about this is that you could treat this collection as another stream and in essence chain another aggregation on top of your previous one.

Day Stream -> Week Stream -> Month Stream -> Year Stream

(not sure if that makes sense)

  • Or be able to specify a range adhoc, and run the aggregation in real time. This would be helpful for exploring past data.

I guess both scenarios could be accomplished by taking the saved stream and importing it into RavenDB or SQL Server and doing analysis there, but it would be nice if it had a mechanism built in to do that.

I know we don't want to compare this to EventStore, but one of the mind blowing realizations of that software is that you can rerun all past events and, in theory, never lose business information. That is a really exciting prospect, but I have yet to see a real implementation of that model.

Ayende Rahien
06/06/2013 12:30 PM by
Ayende Rahien

I haven't thought yet about the ability to do something like aggregate to a different stream, but if we support heterogenous streams, I don't see a reason why that can't be the case. I assumed that if you need time based data, you would do that using the aggregation already.

Comments have been closed on this topic.