Pipes and filters: The multi threaded version

Another advantage of using the pipes and filters approach is that it is naturally thread safe. Only a single filter is handling an item at a given time, even if the pipes are all running on multiply threads.

This means that we have an very simple way to introduce threading into the system, without any hassle whatsoever. Let us take a look at the single threaded pipeline:

public class SingleThreadedPipeline<T>
{
    private readonly List<IOperation<T>> operations = new List<IOperation<T>>();

    public SingleThreadedPipeline<T> Register(IOperation<T> operation)
    {
        operations.Add(operation);
        return this;
    }

    public void Execute()
    {
        IEnumerable<T> current = new List<T>();
        foreach (IOperation<T> operation in operations)
        {
            current = operation.Execute(current);
        }
        IEnumerator<T> enumerator = current.GetEnumerator();
        while (enumerator.MoveNext()) ;
    }
}

Very simple, except the last line, which is what push the entire pipeline along. Now, what do we need in order to make this multi threaded?

Well, what do I mean when we talk about multi threaded? I mean that we will execute all the operations concurrently, so they can process different parts of the pipeline at the same time. This allows us to make better use of our computing resources, etc.

Here is the code:

public class ThreadPoolPipeline<T>
{
    private readonly List<IOperation<T>> operations = new List<IOperation<T>>();

    public ThreadPoolPipeline<T> Register(IOperation<T> operation)
    {
        operations.Add(operation);
        return this;
    }

    public void Execute()
    {
        IEnumerable<T> current = new List<T>();
        foreach (IOperation<T> operation in operations)
        {
            IEnumerable<T> execute = operation.Execute(current);
            current = StartConsuming(execute);
        }
        IEnumerator<T> enumerator = current.GetEnumerator();
        while (enumerator.MoveNext()) ;
    }
    private ThreadSafeEnumerator<T> StartConsuming(IEnumerable<T> enumerable)
    {
        ThreadSafeEnumerator<T> threadSafeEnumerator = new ThreadSafeEnumerator<T>();
        ThreadPool.QueueUserWorkItem(delegate
        {
            try
            {
                foreach (T t in enumerable)
                {
                    threadSafeEnumerator.AddItem(t);
                }
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
            }
            finally
            {
                threadSafeEnumerator.MarkAsFinished();
            }
        });
        return threadSafeEnumerator;
    }
}

We are using ThreadSafeEnumerate here, and pass a callback to the thread pool which will execute the pervious part of the pipeline and push them into the current pipeline.

This is just an advance version of decorators.

The implementation of ThreadSafeEnumerator is about as simple as multi threaded code can be:

public class ThreadSafeEnumerator<T> : IEnumerable<T>, IEnumerator<T>
{
    private bool active = true;
    private readonly Queue<T> cached = new Queue<T>();
    private T current;

    public IEnumerator<T> GetEnumerator()
    {
        return this;
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return ((IEnumerable<T>)this).GetEnumerator();
    }

    public T Current
    {
        get { return current; }
    }

    public void Dispose()
    {
        cached.Clear();
    }

    public bool MoveNext()
    {
        lock (cached)
        {
            while (cached.Count == 0 && active)
                Monitor.Wait(cached);

            if (active == false && cached.Count == 0)
                return false;

            current = cached.Dequeue();

            return true;
        }
    }

    public void Reset()
    {
        throw new NotSupportedException();
    }

    object IEnumerator.Current
    {
        get { return Current; }
    }

    public void AddItem(T item)
    {
        lock (cached)
        {
            cached.Enqueue(item);
            Monitor.Pulse(cached);
        }
    }

    public void MarkAsFinished()
    {
        lock(cached)
        {
            active = false;
            Monitor.Pulse(cached);
        }
        
    }
}

The real magic happens in MoveNext(), with support from AddItem() and MarkAsFinished().

This is it, these two classes are all we need to make everything else multi threaded.

Note that this version assumes that you can execute all the operations concurrently, which may not be the can if you have a lot of them (over 25/CPU by default). At that point, we would need to implement coroutines for the ThreadSafeEnumearator, instead of just blocking the thread.

Print | posted on Sunday, January 06, 2008 4:17 AM

Feedback


Gravatar

# re: Pipes and filters: The multi threaded version 1/6/2008 8:42 AM Reshef

Take a look at Google's map-reduce. It really resembles what u showed here, especially the multi threaded version, except that they use it for massive computations on grids of computers. I think that with a little remoting or WCF your pipeline implementation can do something similar.


Gravatar

# re: Pipes and filters: The multi threaded version 1/6/2008 8:43 AM Reshef

Forgot to put this link:
http://labs.google.com/papers/mapreduce.html
For the map reduce...


Gravatar

# re: Pipes and filters: The multi threaded version 1/6/2008 12:05 PM Patrick Smacchia

In my book Practical .NET2 and C#2 I explain how to code pipeline with iterators of C#2 (keyword yield and yield break). The result is super concise syntax. For example:
http://www.practicaldot.net/Chapter_14/Listing_14_47.htm

And also how to compute prime numbers with pipes with not even 20 lines of code.
http://www.practicaldot.net/Chapter_14/Listing_14_50.htm

The whole chapter is available as an article here:
http://65.214.43.45/tt/articles/showarticle.tss?id=IteratorsWithC2


Gravatar

# re: Pipes and filters: The multi threaded version 1/6/2008 5:01 PM Jon Skeet

Having rediscovered LINQ as pipes and filters, I think you've now rediscovered Parallel LINQ :)

Seriously, it would be worth looking into LINQ further, and the Parallel Extensions. There's a CTP available at
http://www.microsoft.com/downloads/details.aspx?FamilyId=E848DC1D-5BE3-4941-8705-024BC7F180BA&displaylang=en

Without in any way wishing to cast aspersions on your multithreading abilities, the guys behind Parallel LINQ have spent a long time on it and are seriously smart on concurrency. I'm sure they'd value your input, too.

Jon


Gravatar

# re: Pipes and filters: The multi threaded version 1/6/2008 5:05 PM Ayende Rahien

Jon,
If I never have to write multi threaded code again, I'll be very happy.
See my previous response to pipelines & linq in the previous thread.
I don't think that you can scale the syntax to be appropriate.

I would love to see myself proved wrong, however.


Gravatar

# re: Pipes and filters: The multi threaded version 1/6/2008 8:46 PM Jeremy Gray

I've said it before but I'll say it again: you really need to go catch up on the ParallelFX libraries. The CTP is available and there are great videos on Channel9. You'll find parallel for loops, parallel foreach, parallel LINQ (over objects at the least, not sure about the others), tasks, futures, all with configurable parallelization, work-stealing, ordering, etc. It is well worth checking out, if only to help distinguish your goals and approach from theirs. It'd help us readers for that reason, too. ;)


Gravatar

# re: Pipes and filters: The multi threaded version 1/8/2008 5:04 PM Vijay SAnthanam

Hi Ayende,

I like your pipe and filter implementation, its super concise and effective.

But I'm curious how you intend to implement forks and joins into your pipe and filter pattern. I'd like to see you try this out :)


Gravatar

# re: Pipes and filters: The multi threaded version 1/16/2008 9:31 AM Ayende Rahien

See my latest post about Rhino ETL for the answer.

Comments have been closed on this topic.