Ayende @ Rahien

Refunds available at head office

Pipes and filters: The IEnumerable appraoch

Pipes are very common in computing. It is a very good way to turn a complex problem to a set of small problems. You are probably familiar with the pattern, even if not explicitly.

  • The ASP.Net Http Pipeline (Begin_Request, Authorize_Request, etc
  • Compiler Pipelines (Parse, ProcessTypes, SaveAssembly, etc)
  • Command Line piping (ps -ax | grep Finder)

What I wanted to talk about today was how to implement this in code. I did several implementation of pipes and filters in the past, and they all were overly complex. I took this weekend to look at the problem again, and I came up with a ridiculously simple solution.

In a nutshell, here it is:

image

We have a pipeline, that is composed of operations. Each operation accepts an input and return an output. The use of IEnumerable<T> means that we can streamline the entire process without any effort whatsoever.

Most problems that calls for the pipeline approach are fairly complex, so picking a simple example means that it is trivial to implement it otherwise. Let us go to the really trivial sample of printing all the processes whose working set is greater than 50 MB.

We have three stages in the pipeline, the first, get processes:

public class GetAllProcesses : IOperation<Process>
{
    public IEnumerable<Process> Execute(IEnumerable<Process> input)
    {
        return Process.GetProcesses();
    }
}

The second, limit by working set size:

public class LimitByWorkingSetSize : IOperation<Process>
{
    public IEnumerable<Process> Execute(IEnumerable<Process> input)
    {
        int maxSizeBytes = 50 * 1024 * 1024;
        foreach (Process process in input)
        {
            if (process.WorkingSet64 > maxSizeBytes)
                yield return process;
        }
    }
}

The third, print process name:

public class PrintProcessName : IOperation<Process>
{
    public IEnumerable<Process> Execute(IEnumerable<Process> input)
    {
        foreach (Process process in input)
        {
            System.Console.WriteLine(process.ProcessName);
        }
        yield break;
    }
}

All of those are very trivial implementation. You can see that the GetAllProcesses class doesn't care about its input, it is the source. The LimitByWorkingSetSize iterate over the input and use the "yield return" keywords to stream the results to the next step, PrintProcessesName. Since this step is the final one, we use the "yield break" keywords to make it compile without returning anything. (We could return null, but that would be rude).

It is important to note that the second stage uses the if to control what get pass downstream.

Now we only have to bring them together, right?

public class TrivialProcessesPipeline : Pipeline<Process>
{
    public TrivialProcessesPipeline()
    {
        Register(new GetAllProcesses());
        Register(new LimitByWorkingSetSize());
        Register(new PrintProcessName());
    }
}

Now, executing this pipeline will execute all three steps, in a streaming fashion.

Okay, this is a lot of code that we can replace with the following snippet:

int maxSizeBytes = 50 * 1024 * 1024;
foreach (Process process in Process.GetProcesses())
{
     if (process.WorkingSet64 > maxSizeBytes)
         System.Console.WriteLine(process.ProcessName);
}

What are we getting from this?

Composability and streaming. When we execute the pipeline, we are not executing each step in turn, we are executing them all in parallel. (Well, not in parallel, but together.)

Hey, I didn't show you how the Pipeline<T> was implemented, right?

public class Pipeline<T>
{
    private readonly List<IOperation<T>> operations = new List<IOperation<T>>();

    public Pipeline<T> Register(IOperation<T> operation)
    {
        operations.Add(operation);
        return this;
    }

    public void Execute()
    {
        IEnumerable<T> current = new List<T>();
        foreach (IOperation<T> operation in operations)
        {
            current = operation.Execute(current);
        }
        IEnumerator<T> enumerator = current.GetEnumerator();
        while (enumerator.MoveNext()) ;
    }
}

I'll leave you to ponder that.

Comments

Mark Monster
01/05/2008 08:39 PM by
Mark Monster

I've heard about Pipes and Filters as an Architecture Pattern. But didn't do any implementation of it yet. Do you have a real world example about when Pipes and Filters is a good use?

Besides this, a very nice Pipes and Filters solution I think. I haven't tried the code, yet. But what about the last two lines in Pipeline.Execute, what's the use?

IEnumerator enumerator = current.GetEnumerator();

while (enumerator.MoveNext()) ;

Ayende Rahien
01/05/2008 08:43 PM by
Ayende Rahien

I gave a few in the beginning of the post.

Others include batch processing, ETL, workflow, etc.

The last two lines are where the magic happens, they are driving the whole thing.

Avish
01/05/2008 08:57 PM by
Avish

That's nice, but I'm bugged about the way the ends are implemented. The first step ignores its input, and in the last step we had to trick the compiler since we're not returning anything. Also, the last "while (enumerator.MoveNext())" is a little ugly.

Also, I think the example doesn't do a good job of explaining the need. This kind of things really calls for list comprehensions (p.Name for p in Process.GetProcesses if p.WorkingSet > threshold) or LINQ, I guess. This heavy-duty kind of pipelines really is useful for doing complicated, state-sensitive work on a collection of objects (the compiler example is a better one).

Arnon Rotem-Gal-Oz
01/05/2008 10:12 PM by
Arnon Rotem-Gal-Oz

Hi Oren

You got the pipes and filters mixed

The pipes are where the messages flow and the filters is where the processing is done (see http://www.enterpriseintegrationpatterns.com/PipesAndFilters.html)

Arnon

Omer Mor
01/05/2008 11:26 PM by
Omer Mor

Nice implementation, Oren.

Is there any reason you chose to use an interface (IOperation) instead of a delegate?

I don't like decalring new classes if I don't have to, and I don't see any problem with replacing IOperation with a method that fits the following delegate:

delegate IEnumerable ExecuteOperation(IEnumerable input);

If you'll need a statefull class that contains the operation that you can implement one and pass its execute method as the delegate, but if your operations are stateless, than a simple (possibly static) method will do.

Alex Henderson
01/06/2008 12:10 AM by
Alex Henderson

Hi Oren,

The thing that bugs me is the IEnumerable being baked in - why not make the operations work with T instead of IEnumerable, you can still achieve the same end result, but you can use the pipeline for processing singular items like requests... or am I missing something?

Maybe something like this:

http://trac.devdefined.com/public/trac/tools.devdefined.com/browser/trunk/src/DevDefined.Common/Pipeline/Pipeline.cs

Paul Stovell
01/06/2008 12:31 AM by
Paul Stovell

Hi Oren,

Pipes and filters are exactly what LINQ does.

Instead of implementing an IOperation interface, you could simply implement IEnumerable. The following extension methods:

public static IEnumerable LimitByWorkingSetSize(this IEnumerable inputs);

public static IEnumerable PrintProcessName(this IEnumerable inputs);

Would be all you need. You can then pipe them like so:

GetAllProcesses().LimitByWorkingSetSize().PrintProcessName();

The Pipeline class also has a severe limitation in that it assumes all operations are for the same type. This disables transformation features within an operation. The Pipeline class should be simply Pipeline, and the operation should at least be:

IOperation<TInput, TOutput>

However, what if an operation has multiple inputs? (Like a union for example.) You could collect all of the inputs into one container object. But why not simply use a method, IEnumerable, and skip the IOperation interface?

static IEnumerable LimitByWorkingSetSize(this IEnumerable inputs, int size) {

return new LimitByWorkingSetSizeEnumerator(inputs, size);

}

class LimitByWorkingSetSizeEnumerator : IEnumerable{

private int _size;

private IEnumerable _inputs;

public LimitByWorkingSetSize(IEnumerable inputs, int size) {

     _size = size; _inputs = inputs;

}

public void GetEnumerator() {

    foreach (Process p in inputs) {

         if (p.Size < _size) {

             yield return p;

         }

    }

}

}

Note that the implementations can be stateful or stateless. Consider the "Where" extension, which filters items one-by-one, or the OrderBy extension, which reads all of the inputs before sorting and returning the outputs.

Ayende Rahien
01/06/2008 07:19 AM by
Ayende Rahien

Avish,

yes, we are cheating the compiler to get the nice programming model.

I mentioned that this is a trivial example. The real ones are usually too complex to be easily explained.

Ayende Rahien
01/06/2008 07:22 AM by
Ayende Rahien

Arnon,

Thanks for pointing it out.

It looks like I am calling filters a state or operation, is that what you mean?

I don't like the name filter in this case, because most of the time I am not doing filtering there, I am doing transformations on the data.

Note to self: re-read pattern's description before using it.

Ayende Rahien
01/06/2008 07:23 AM by
Ayende Rahien

Omer,

A class give me more options and has better scalability in general.

You could do it with a delegate, but using classes makes sense in this scenario, I want to have a lot of small tiny classes.

Ayende Rahien
01/06/2008 07:25 AM by
Ayende Rahien

Alex,

Because of the filtering thing.

I may want to stop processing an item, so I can just not yield it.

I may want to split an item, so I can just yield it twice.

Ayende Rahien
01/06/2008 07:28 AM by
Ayende Rahien

Paul,

Consider a set of business rules that need to execute on an order batch, to decide if we can approve it.

Consider a set of transformations that a message goes through before it is let out the door.

Consider a set of data manipulation that occurs for a row in an ETL process.

Linq is not a good approach in those scenarios. I am not talking about querying, in most cases, I am talking about processing.

Markus Zywitza
01/06/2008 12:29 PM by
Markus Zywitza

For better composibility, you should consider Pipeline implementing IOperation to allow concatenating Pipelines.

The Execute()-Method will then use the input enumeration instead of creating a new List and yield its results, avoiding the ugly empty while-loop as a side benefit.

Jon Skeet
01/06/2008 02:59 PM by
Jon Skeet

Business rules deciding on whether or not an order batch should be approved: Where clause.

Transformation: Select clause

Data manipulation for a row in an ETL process: I don't know enough about this to comment.

The first two at least are perfectly reasonable uses for LINQ, and I suspect the last is too. LINQ is for more than just querying. Paul is right: pipes and filters are exactly what LINQ is all about, at least for LINQ to Objects.

Jon

Ayende Rahien
01/06/2008 03:04 PM by
Ayende Rahien

Jon,

When I am thinking about selecting & transforming, I am usually talking about more than one liners.

Assume that you want to interrogate an external system for data about the customer credit status.

Or that the transformation is involved or contains complex business logic.

All I have seen of linq so far convinced me that it breaks down really fast when you get to complex stuff.

Jon Skeet
01/06/2008 04:02 PM by
Jon Skeet

If you want more than a one liner, write a method and use that as the action of the delegate instance. Don't forget:

1) You don't have to use anonymous methods or lambda expressions to create delegate instances.

2) You don't have to use query expressions to use LINQ.

3) You can write your own extension methods as well to expand LINQ as you need to.

Now admittedly the bug wrt output type inference of method groups is a slight disadvantage here - but it's better than being forced to (manually) create a new type every time you need a different kind of filter or transformation.

I really believe that pretty much any limitation of LINQ is going to prove a limitation of your scheme above too - simply because they're so similar. The advantage of LINQ is that for simple cases you can use query expressions, lambda expressions etc. Oh, and it's going to be rather better understood by the majority of developers in the next couple of years :)

Jon

Ayende Rahien
01/06/2008 06:40 PM by
Ayende Rahien

Jon,

The only criteria that I have for this is how maintainable I can make it.

I don't see Linq adding anything to the mix here. It is possible that I am wrong, but I would wait to see the code before being able to say so.

Jon
01/06/2008 06:54 PM by
Jon

Using LINQ would add four things:

1) Not reinventing the wheel. If you hire a C# developer in a year, I'd hope they'd be familiar with LINQ. They probably won't be familiar with your pipeline framework.

2) Taking advantage of the ease of creating delegates in C# 3, rather than forcing the use of interfaces.

3) Transformation ability, as Paul pointed out, where the input and output types can be different

4) The ability to integrate simply with other LINQ-related technologies such as Parallel LINQ.

I would argue that your use of PrintProcessName is an odd one for a pipeline, to yield an empty result at the end. I think I'd rather implement a ForEach extension method on IEnumerable which takes an Action. (I'm kinda surprised that isn't in the framework already, to be honest.)

At that point, your code would become:

Process.GetAllProcesses()

.Where (proc => proc.WorkingSet64 > 50*1024*1024)

.ForEach(proc => Console.WriteLine(proc.ProcessName);

No new types needed at all, except for the static class to hold the ForEach extension method. If you want to create extra classes for reusable logic, you certainly can - but you don't have to.

If the logic for any of the steps is complicated, you can stick that in a method easily, either casting the method group or just calling the method from a lambda expression. When the logic isn't complicated, do it inline as above.

Personally, I think that's more maintainable. We still have the composability and the streaming, but we also have all the standard query operators, the ability to use query expressions where you need to, etc.

I may have said it before on this blog, but I believe LINQ to Objects has been significantly under-marketed. LINQ to SQL has more of an "ooh, ahh" factor - but LINQ to Objects will be more applicable in many situations (and without forcing you to use SQL server!)

Ayende Rahien
01/06/2008 07:11 PM by
Ayende Rahien

1/ if they can't grok the concept in 30 minutes, they are not worth keeping. The idea of grabbing someone from the street is a myth.

2/ build a DelegateOperation(Action), and you are set

3/ In this scenario, I actually need it to keep one type all the way. I am doing transformations in a pipeline. If I wanted any type, I could have used IEnumerable instead of IEnumerable

4/ interesting, probably the best point.

I said that the print processes example is trivial, right?

A more realistic sample would be:

1/ read customers from file

2/ left join to existing customers in database

3/ for all those missing from database:

3.1/ create customer record

3.2/ send email about new customer

4/ get all active orders in last day

5/ left join customers with orders

6/ update customer statistics

6.1 / amount bought

6.2 / favorite products

7/ update customers

Assume significant complexity for at least some of those steps.

Assume that I want to maintain separation of concerns.

Paul Stovell
01/07/2008 03:27 AM by
Paul Stovell

Using LINQ and extension methods does not mean you have to put all of your operation inside one method.

If you look at many extensions, they actually return a class (implementing IEnumerble) which contains all the logic. The SyncLINQ source code certainly isn’t one 40,000 line class with 500 massive methods; I’m sure the LINQ source is similar.

ETL can't be done with LINQ? Here's how I'd do it:

var transformedCustomers = new AddAdditionalMetadataToCustomerOperation(

                           new ConvertCRMCustomerToSASCustomerOperation(

                              new SwapFirstNameAndLastNameOperation(

                                  new ImportCustomersFromCRMOperation(crmUrl));

The nice thing about IEnumerable and not fixing the inputs/outputs is each class can be defined differently:

class AddAdditionalMetadataToCustomerOperation : IEnumerable

public new (IEnumerable);

class ConvertCRMCustomerToSASCustomerOperation : IEnumerable

public new (IEnumerable);

class SwapFirstNameAndLastNameOperation: IEnumerable

public new (IEnumerable);

class ImportCustomersFromCRMOperation: IEnumerable

public new (string crmUrl);

Note that each operation can return different types, and take different types as inputs. Since you’re using classes, you can use inheritance. It has all the capabilities of the solution you blogged about. You can add multithreading, processor yielding, whatever you want.

Then, for readability only, you can wrap it in some LINQ extensions to become:

var sasCustomers = ImportCustomersFromCRM(url)

               .SwapFirstAndLastName()

               .ConvertToSasCustomers()

               .AddAdditonalMetadata();

And like any good pipeline, it reads from right to left :)

Jon Skeet
01/07/2008 08:55 AM by
Jon Skeet

Is your example with customers meant to be a single pipeline, or two? There seems to be a disconnect at item 4. If there isn't, I don't quite understand it - but it raises an interesting issue anyway.

One issue with the "pull" model of LINQ is it assumes there's basically one consumer. If you want to split a pipeline, it's relatively hard to do without threading. Just as a sort of plug, and because you might find it interesting, have a look at my blog entry about "push" LINQ:

http://msmvps.com/blogs/jon.skeet/archive/2008/01/04/quot-push-quot-linq-revisited-next-attempt-at-an-explanation.aspx

(Or at the moment, just the top entry at http://msmvps.com/jon.skeet)

It supports splitting at any point very naturally, although I haven't used that for anything other than simple cases.

I'd expect all the steps you've mentioned to be feasible with LINQ though - as I say in the fluent pipelines thread, you can still use methods as delegate targets too...

Jon

wow
01/13/2008 06:48 AM by
wow

wow, I just stumbled upon this and will bookmark this.. my understanding of LINQ to Objects is 10x. Thank you Thank you!

Comments have been closed on this topic.