Ayende @ Rahien

It's a girl

Limit your abstractions: Application Events–event processing and RX

In my last post, I mentioned that this is actually an event processing system, so we might as well use actual event processing and see what we can gain out of this. I chose to use RX (reactive extensions), which can turn a series of events into a linq statement. This is incredibly powerful, and has some interesting implications when you combine this with your architecture. In particular, let us see what we can get when we set out to replace this with RX based event processing style.

image_thumb3_thumb_thumb

We can get to something like this very easily:

public class CargoProcessor : EventsProcessor
{
    public CargoProcessor()
    {
        On<Cargo>(cargos =>
            from cargo in cargos
            where cargo.Delivery.Misdirected
            select MisdirectedCargo(cargo)
            );

        On<Cargo>(cargos =>
            from cargo in cargos
            where cargo.Delivery.UnloadedAtDestination
            select CaroArrived(cargo)
        );
    }

    private object CaroArrived(Cargo cargo)
    {
        // handle event
        return null;
    }

    private object MisdirectedCargo(Cargo cargo)
    {
        // handle event
        return null;
    }
}

We use RX to handle the linq processing over the events, and in EventsProcessor we have very little code, probably just:

    public class EventsProcessor
    {
        private readonly List<Func<IObservable<object>, IObservable<object>>>  actions = new List<Func<IObservable<object>, IObservable<object>>>();

        protected void On<T>(Func<IObservable<T>, IObservable<object>> action)
        {
            actions.Add(observable => action(observable.OfType<T>()));
        }

        public void Execute(IObservable<object> observable)
        {
            foreach (var action in actions)
            {
                action(observable).Subscribe();
            }
        }
    }

Elsewhere in the code we setup the actual Obsersable that we pass to all the EventsProcessors. The major advantages that we have with this style is that we have a natural syntax to do selection on the events that interest us, including fairly complex one. We still have easy time of creating new EventsProcessors if we want, but because the code for defining the selection is so compact, we can usually put related stuff together, which is going to be very helpful for making sure that the codebase is readable.

And, naturally, this method extends itself to handling events of multiple types in the same place. For example, if we want to also handle the HandlingEvent, we can do it in place, because it is very much related to the Cargo, it seems.

Comments

Scooletz
02/10/2012 09:41 AM by
Scooletz

I don't like this tooling. The previous proposal, with a simple interface with one method Handle was much clearer. There was no derivation, only implementation of one interface which is well known (for instance in NServiceBus - IMessageHandler). Here, you are obliged to use RX, derive from some class (yes, I know it's small but). IMHO, the previous solution was nicer.

Duke
02/10/2012 09:54 AM by
Duke

Well this looks debuggable ;)

cocowalla
02/10/2012 10:48 AM by
cocowalla

Yikes, maybe it's just me, but this seems convoluted and far less clear than the other solutions... like using a sledgehammer to crack a nut!

Colin Bull
02/10/2012 12:49 PM by
Colin Bull

IMHO OO is often used (incorrectly) to express algorithms by implementing behaviours on objects and augmenting that behaviour via hierarchies. This leads to either

overly generic interfaces with non intentional names like 'Handle' or 'Execute'

or

a complete forest of interfaces to represent each individual class of business operations

I dont think that either of these solutions are acceptable from a maintenance or readability POV.

Really the algorithm is separate from the Data and often amounts to a handful of common operations over that particular data structure. This is why LINQ is so powerful, and what ayende has exploited with the above solution.

Bil Simser
02/10/2012 01:13 PM by
Bil Simser

Hmm. I really don't like the CargoProcessor. Two problems. First, LINQ can be slow(ish) sometimes. On a simple select I don't think there's an issue here (grouping can be craptastic). However (and I'm no expert here) isn't CargoProcessor looping through cargos twice? What if this was a real-world shipping app and I was going through 30,000 items in my warehouse?

I prefer the delegation in previous posts where the determination was done as the individual entity came in then got farmed out to a separate class.

The other problem that's staring me in the face is that if I go back to the original code that started this thread, it does:

if(Cargo.Delivery.Misdirected) { ... }

if(Cargo.Delivery.Arrived) { ... }

Here in the CargoProcessor we're doing the same thing are we not?

On( misdirected ... }

On( arrived ... }

Or am I missing something?

Ayende Rahien
02/10/2012 01:31 PM by
Ayende Rahien

Bil, 30,000 items is nothing. And Linq isn't slow. It is basically method calls, so there is nothing there to be slow. The linq approach is the same exact thing as the previous one, sure, that is the point. It is a refactoring, after all. What we are doing it trying to get to better code. And I think this is clearer and more maintainable over the long run

Bil Simser
02/10/2012 01:48 PM by
Bil Simser

Okay, ignoring the LINQ performance (not sure why my LINQ statements are crap but that's another issue) I can see the clarity in the refactoring now.

I'm not familiar with the reactive extensions but it almost looks like a more simplified version of Udi Dahan's domain event pattern (http://www.udidahan.com/2009/06/14/domain-events-salvation/). Nice.

My only question is would CargoProcessor handle any kind of processing or when do you decide to split the system and come up with something that doesn't result in 10 On statements.

Frank Quednau
02/10/2012 01:54 PM by
Frank Quednau

@bil, nothing is looped here, it is more comparable to event subscription, with a new cargo instance being the event.

Ayende Rahien
02/10/2012 02:04 PM by
Ayende Rahien

Bil, Nothing prevents you from having multiple event processors. In fact, you would probably have them. The only difference is that you would put related stuff nearby, to make it easier to follow and maintain.

Omer Mor
02/10/2012 02:07 PM by
Omer Mor

Using the Rx approach you also gain elegant options for filtering (based on time like sampling and throttling, or based on data like you use did), projections (e.g turning a cargo event stream into an invoice event stream), and more. The LINQ model is a perfect fit for event processing.

Steve Wagner
02/10/2012 02:23 PM by
Steve Wagner

On() .Where(carge=>cargo.Delivery.Misdireted) .Subscribe(MisdrectedCargo);

Wouldnt this be better? This would remove the ugly return Null and seems to be more clear in what happens.

Ayende Rahien
02/10/2012 02:27 PM by
Ayende Rahien

Steve, Now try to do that on a more complex statement. Maybe something that has a grouping, etc.

But that is basically the same idea, yes. I would drop the On() entirely and do something like:

Where(carge=>cargo.Delivery.Misdireted) .Subscribe(MisdrectedCargo);

Where(carge=>cargo.Delivery.Late) .Subscribe(LateCargo);

Etc.

JarrettV
02/10/2012 08:22 PM by
JarrettV

Ayende, can you contrast this approach with Domain Events?

http://www.udidahan.com/2009/06/14/domain-events-salvation/

Mike Bild
02/10/2012 09:07 PM by
Mike Bild

I like this idea. Hold your processor logic nearly your handler impl. My suggestion would be, change the handler return type to Unit instead of object. This solves ugly Null stuff.

you can simplify the processor to:

public class EventsProcessor { private readonly Subject subject = new Subject(); protected void On(Func<IObservable, IObservable> action) { action(subject.OfType()) .Subscribe(); }

    public void Execute(IObservable<object> observable)
    {
        observable
            .Multicast(_subject)
            .Connect();
    }
}

...and I would rename Execute to Connect or StartProcessor.

Nick
02/10/2012 10:07 PM by
Nick

I think the older version was much easier to read. I understood exactly what was going on the first time I read it. Whether or not that is the best long-term approach I don't know - "just sayin".

Hendry Luk
02/11/2012 03:05 AM by
Hendry Luk

Ayende, i think what steve meant was using the standard rx methods (Where, Subscribe, etc, which also includes grouping, joining, and so on). I.e. its not a random Where method that you write yourself

Hendry Luk
02/11/2012 03:13 AM by
Hendry Luk

Also, i dont understand the purpose of the base class here. Why dont you just write that cargo subsciption logic in the Execute method directly? Rather than doing it in the constructor, keeping the logic in the list to be executed later during Execute, etc, hence requiring a base class. It does not seem to add any value. This could have been a simple single-method interface implementation, no?

Ayende Rahien
02/11/2012 03:31 AM by
Ayende Rahien

Hendry, how would you discover that? where would you put it?

This way, you split the event processing and the event submission.

Hendry Luk
02/11/2012 03:46 AM by
Hendry Luk

Wouldn't the behavior remain exactly the same if it is written as the following?

(I have a feeling the generic syntax is gonna get stripped by your blog)

public class CargoProcessor : IEventProcessor { void IEventProcessor.Execute(IObservable events) { events.OfType .Where(cargo => cargo.Delivery.Misdirected) .Subscribe(MisdirectedCargo);

    events.OfType<Cargo>
    .Where(cargo => cargo.Delivery.UnloadedAtDestination)
    .Subscribe(CaroArrived);
}

private void CaroArrived(Cargo cargo)
{
    // handle event
}

private void MisdirectedCargo(Cargo cargo)
{
    // handle event
}

}

Wouldn't that do exactly the same thing? I just dont understand the purpose of all the moving components involved in the base-class.

Ayende Rahien
02/11/2012 03:49 AM by
Ayende Rahien

Hendry, I like the base class approach better, it allows me to remove the infrastructure concerns out.

Hendry Luk
02/11/2012 03:49 AM by
Hendry Luk

I think i might have decipher out how code-formatting works on your blog. So, second attempt:

public class CargoProcessor : IEventProcessor
{
    void IEventProcessor.Execute(IObservable<object> events)
    {
        events.OfType<Cargo>
        .Where(cargo => cargo.Delivery.Misdirected)
        .Subscribe(MisdirectedCargo);

        events.OfType<Cargo>
        .Where(cargo => cargo.Delivery.UnloadedAtDestination)
        .Subscribe(CaroArrived);
    }

    private void CaroArrived(Cargo cargo)
    {
        // handle event
    }

    private void MisdirectedCargo(Cargo cargo)
    {
        // handle event
    }
}
Comments have been closed on this topic.