Limit your abstractionsApplication Events–event processing and RX
In my last post, I mentioned that this is actually an event processing system, so we might as well use actual event processing and see what we can gain out of this. I chose to use RX (reactive extensions), which can turn a series of events into a linq statement. This is incredibly powerful, and has some interesting implications when you combine this with your architecture. In particular, let us see what we can get when we set out to replace this with RX based event processing style.
We can get to something like this very easily:
public class CargoProcessor : EventsProcessor { public CargoProcessor() { On<Cargo>(cargos => from cargo in cargos where cargo.Delivery.Misdirected select MisdirectedCargo(cargo) ); On<Cargo>(cargos => from cargo in cargos where cargo.Delivery.UnloadedAtDestination select CaroArrived(cargo) ); } private object CaroArrived(Cargo cargo) { // handle event return null; } private object MisdirectedCargo(Cargo cargo) { // handle event return null; } }
We use RX to handle the linq processing over the events, and in EventsProcessor we have very little code, probably just:
public class EventsProcessor { private readonly List<Func<IObservable<object>, IObservable<object>>> actions = new List<Func<IObservable<object>, IObservable<object>>>(); protected void On<T>(Func<IObservable<T>, IObservable<object>> action) { actions.Add(observable => action(observable.OfType<T>())); } public void Execute(IObservable<object> observable) { foreach (var action in actions) { action(observable).Subscribe(); } } }
Elsewhere in the code we setup the actual Obsersable that we pass to all the EventsProcessors. The major advantages that we have with this style is that we have a natural syntax to do selection on the events that interest us, including fairly complex one. We still have easy time of creating new EventsProcessors if we want, but because the code for defining the selection is so compact, we can usually put related stuff together, which is going to be very helpful for making sure that the codebase is readable.
And, naturally, this method extends itself to handling events of multiple types in the same place. For example, if we want to also handle the HandlingEvent, we can do it in place, because it is very much related to the Cargo, it seems.
More posts in "Limit your abstractions" series:
- (22 Feb 2012) And how do you handle testing?
- (21 Feb 2012) The key is in the infrastructure…
- (20 Feb 2012) Refactoring toward reduced abstractions
- (16 Feb 2012) So what is the whole big deal about?
- (15 Feb 2012) All cookies looks the same to the cookie cutter
- (14 Feb 2012) Commands vs. Tasks, did you forget the workflow?
- (13 Feb 2012) You only get six to a dozen in the entire app
- (10 Feb 2012) Application Events–event processing and RX
- (09 Feb 2012) Application Events–Proposed Solution #2–Cohesion
- (07 Feb 2012) Application Events–Proposed Solution #1
- (06 Feb 2012) Application Events–what about change?
- (03 Feb 2012) Application Events–the wrong way
- (02 Feb 2012) Analyzing a DDD application
Comments
I don't like this tooling. The previous proposal, with a simple interface with one method Handle was much clearer. There was no derivation, only implementation of one interface which is well known (for instance in NServiceBus - IMessageHandler). Here, you are obliged to use RX, derive from some class (yes, I know it's small but). IMHO, the previous solution was nicer.
Well this looks debuggable ;)
Yikes, maybe it's just me, but this seems convoluted and far less clear than the other solutions... like using a sledgehammer to crack a nut!
IMHO OO is often used (incorrectly) to express algorithms by implementing behaviours on objects and augmenting that behaviour via hierarchies. This leads to either
overly generic interfaces with non intentional names like 'Handle' or 'Execute'
or
a complete forest of interfaces to represent each individual class of business operations
I dont think that either of these solutions are acceptable from a maintenance or readability POV.
Really the algorithm is separate from the Data and often amounts to a handful of common operations over that particular data structure. This is why LINQ is so powerful, and what ayende has exploited with the above solution.
Hmm. I really don't like the CargoProcessor. Two problems. First, LINQ can be slow(ish) sometimes. On a simple select I don't think there's an issue here (grouping can be craptastic). However (and I'm no expert here) isn't CargoProcessor looping through cargos twice? What if this was a real-world shipping app and I was going through 30,000 items in my warehouse?
I prefer the delegation in previous posts where the determination was done as the individual entity came in then got farmed out to a separate class.
The other problem that's staring me in the face is that if I go back to the original code that started this thread, it does:
if(Cargo.Delivery.Misdirected) { ... }
if(Cargo.Delivery.Arrived) { ... }
Here in the CargoProcessor we're doing the same thing are we not?
On<Cargo>( misdirected ... }
On<Cargo>( arrived ... }
Or am I missing something?
Bil, 30,000 items is nothing. And Linq isn't slow. It is basically method calls, so there is nothing there to be slow. The linq approach is the same exact thing as the previous one, sure, that is the point. It is a refactoring, after all. What we are doing it trying to get to better code. And I think this is clearer and more maintainable over the long run
Okay, ignoring the LINQ performance (not sure why my LINQ statements are crap but that's another issue) I can see the clarity in the refactoring now.
I'm not familiar with the reactive extensions but it almost looks like a more simplified version of Udi Dahan's domain event pattern (http://www.udidahan.com/2009/06/14/domain-events-salvation/). Nice.
My only question is would CargoProcessor handle any kind of processing or when do you decide to split the system and come up with something that doesn't result in 10 On<Cargo> statements.
@bil, nothing is looped here, it is more comparable to event subscription, with a new cargo instance being the event.
Bil, Nothing prevents you from having multiple event processors. In fact, you would probably have them. The only difference is that you would put related stuff nearby, to make it easier to follow and maintain.
Using the Rx approach you also gain elegant options for filtering (based on time like sampling and throttling, or based on data like you use did), projections (e.g turning a cargo event stream into an invoice event stream), and more. The LINQ model is a perfect fit for event processing.
On<Cargo>() .Where(carge=>cargo.Delivery.Misdireted) .Subscribe(MisdrectedCargo);
Wouldnt this be better? This would remove the ugly return Null and seems to be more clear in what happens.
Steve, Now try to do that on a more complex statement. Maybe something that has a grouping, etc.
But that is basically the same idea, yes. I would drop the On() entirely and do something like:
Where(carge=>cargo.Delivery.Misdireted) .Subscribe(MisdrectedCargo);
Where(carge=>cargo.Delivery.Late) .Subscribe(LateCargo);
Etc.
Ayende, can you contrast this approach with Domain Events?
http://www.udidahan.com/2009/06/14/domain-events-salvation/
I like this idea. Hold your processor logic nearly your handler impl. My suggestion would be, change the handler return type to Unit instead of object. This solves ugly Null stuff.
you can simplify the processor to:
public class EventsProcessor { private readonly Subject<object> _subject = new Subject<object>(); protected void On<T>(Func<IObservable<T>, IObservable<Unit>> action) { action(_subject.OfType<T>()) .Subscribe(); }
...and I would rename Execute to Connect or StartProcessor.
I think the older version was much easier to read. I understood exactly what was going on the first time I read it. Whether or not that is the best long-term approach I don't know - "just sayin".
Ayende, i think what steve meant was using the standard rx methods (Where, Subscribe, etc, which also includes grouping, joining, and so on). I.e. its not a random Where method that you write yourself
Also, i dont understand the purpose of the base class here. Why dont you just write that cargo subsciption logic in the Execute method directly? Rather than doing it in the constructor, keeping the logic in the list to be executed later during Execute, etc, hence requiring a base class. It does not seem to add any value. This could have been a simple single-method interface implementation, no?
Hendry, how would you discover that? where would you put it?
This way, you split the event processing and the event submission.
Wouldn't the behavior remain exactly the same if it is written as the following?
(I have a feeling the generic syntax is gonna get stripped by your blog)
public class CargoProcessor : IEventProcessor { void IEventProcessor.Execute(IObservable<object> events) { events.OfType<Cargo> .Where(cargo => cargo.Delivery.Misdirected) .Subscribe(MisdirectedCargo);
}
Wouldn't that do exactly the same thing? I just dont understand the purpose of all the moving components involved in the base-class.
Hendry, I like the base class approach better, it allows me to remove the infrastructure concerns out.
I think i might have decipher out how code-formatting works on your blog. So, second attempt:
Comment preview