Oren Eini

CEO of RavenDB

a NoSQL Open Source Document Database

Get in touch with me:

oren@ravendb.net +972 52-548-6969

Posts: 7,546
|
Comments: 51,161
Privacy Policy · Terms
filter by tags archive
time to read 8 min | 1420 words

Another advantage of using the pipes and filters approach is that it is naturally thread safe. Only a single filter is handling an item at a given time, even if the pipes are all running on multiply threads.

This means that we have an very simple way to introduce threading into the system, without any hassle whatsoever. Let us take a look at the single threaded pipeline:

public class SingleThreadedPipeline<T>
{
    private readonly List<IOperation<T>> operations = new List<IOperation<T>>();

    public SingleThreadedPipeline<T> Register(IOperation<T> operation)
    {
        operations.Add(operation);
        return this;
    }

    public void Execute()
    {
        IEnumerable<T> current = new List<T>();
        foreach (IOperation<T> operation in operations)
        {
            current = operation.Execute(current);
        }
        IEnumerator<T> enumerator = current.GetEnumerator();
        while (enumerator.MoveNext()) ;
    }
}

Very simple, except the last line, which is what push the entire pipeline along. Now, what do we need in order to make this multi threaded?

Well, what do I mean when we talk about multi threaded? I mean that we will execute all the operations concurrently, so they can process different parts of the pipeline at the same time. This allows us to make better use of our computing resources, etc.

Here is the code:

public class ThreadPoolPipeline<T>
{
    private readonly List<IOperation<T>> operations = new List<IOperation<T>>();

    public ThreadPoolPipeline<T> Register(IOperation<T> operation)
    {
        operations.Add(operation);
        return this;
    }

    public void Execute()
    {
        IEnumerable<T> current = new List<T>();
        foreach (IOperation<T> operation in operations)
        {
            IEnumerable<T> execute = operation.Execute(current);
            current = StartConsuming(execute);
        }
        IEnumerator<T> enumerator = current.GetEnumerator();
        while (enumerator.MoveNext()) ;
    }
    private ThreadSafeEnumerator<T> StartConsuming(IEnumerable<T> enumerable)
    {
        ThreadSafeEnumerator<T> threadSafeEnumerator = new ThreadSafeEnumerator<T>();
        ThreadPool.QueueUserWorkItem(delegate
        {
            try
            {
                foreach (T t in enumerable)
                {
                    threadSafeEnumerator.AddItem(t);
                }
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
            }
            finally
            {
                threadSafeEnumerator.MarkAsFinished();
            }
        });
        return threadSafeEnumerator;
    }
}

We are using ThreadSafeEnumerate here, and pass a callback to the thread pool which will execute the pervious part of the pipeline and push them into the current pipeline.

This is just an advance version of decorators.

The implementation of ThreadSafeEnumerator is about as simple as multi threaded code can be:

public class ThreadSafeEnumerator<T> : IEnumerable<T>, IEnumerator<T>
{
    private bool active = true;
    private readonly Queue<T> cached = new Queue<T>();
    private T current;

    public IEnumerator<T> GetEnumerator()
    {
        return this;
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return ((IEnumerable<T>)this).GetEnumerator();
    }

    public T Current
    {
        get { return current; }
    }

    public void Dispose()
    {
        cached.Clear();
    }

    public bool MoveNext()
    {
        lock (cached)
        {
            while (cached.Count == 0 && active)
                Monitor.Wait(cached);

            if (active == false && cached.Count == 0)
                return false;

            current = cached.Dequeue();

            return true;
        }
    }

    public void Reset()
    {
        throw new NotSupportedException();
    }

    object IEnumerator.Current
    {
        get { return Current; }
    }

    public void AddItem(T item)
    {
        lock (cached)
        {
            cached.Enqueue(item);
            Monitor.Pulse(cached);
        }
    }

    public void MarkAsFinished()
    {
        lock(cached)
        {
            active = false;
            Monitor.Pulse(cached);
        }
        
    }
}

The real magic happens in MoveNext(), with support from AddItem() and MarkAsFinished().

This is it, these two classes are all we need to make everything else multi threaded.

Note that this version assumes that you can execute all the operations concurrently, which may not be the can if you have a lot of them (over 25/CPU by default). At that point, we would need to implement coroutines for the ThreadSafeEnumearator, instead of just blocking the thread.

time to read 5 min | 879 words

Pipes are very common in computing. It is a very good way to turn a complex problem to a set of small problems. You are probably familiar with the pattern, even if not explicitly.

  • The ASP.Net Http Pipeline (Begin_Request, Authorize_Request, etc
  • Compiler Pipelines (Parse, ProcessTypes, SaveAssembly, etc)
  • Command Line piping (ps -ax | grep Finder)

What I wanted to talk about today was how to implement this in code. I did several implementation of pipes and filters in the past, and they all were overly complex. I took this weekend to look at the problem again, and I came up with a ridiculously simple solution.

In a nutshell, here it is:

image

We have a pipeline, that is composed of operations. Each operation accepts an input and return an output. The use of IEnumerable<T> means that we can streamline the entire process without any effort whatsoever.

Most problems that calls for the pipeline approach are fairly complex, so picking a simple example means that it is trivial to implement it otherwise. Let us go to the really trivial sample of printing all the processes whose working set is greater than 50 MB.

We have three stages in the pipeline, the first, get processes:

public class GetAllProcesses : IOperation<Process>
{
    public IEnumerable<Process> Execute(IEnumerable<Process> input)
    {
        return Process.GetProcesses();
    }
}

The second, limit by working set size:

public class LimitByWorkingSetSize : IOperation<Process>
{
    public IEnumerable<Process> Execute(IEnumerable<Process> input)
    {
        int maxSizeBytes = 50 * 1024 * 1024;
        foreach (Process process in input)
        {
            if (process.WorkingSet64 > maxSizeBytes)
                yield return process;
        }
    }
}

The third, print process name:

public class PrintProcessName : IOperation<Process>
{
    public IEnumerable<Process> Execute(IEnumerable<Process> input)
    {
        foreach (Process process in input)
        {
            System.Console.WriteLine(process.ProcessName);
        }
        yield break;
    }
}

All of those are very trivial implementation. You can see that the GetAllProcesses class doesn't care about its input, it is the source. The LimitByWorkingSetSize iterate over the input and use the "yield return" keywords to stream the results to the next step, PrintProcessesName. Since this step is the final one, we use the "yield break" keywords to make it compile without returning anything. (We could return null, but that would be rude).

It is important to note that the second stage uses the if to control what get pass downstream.

Now we only have to bring them together, right?

public class TrivialProcessesPipeline : Pipeline<Process>
{
    public TrivialProcessesPipeline()
    {
        Register(new GetAllProcesses());
        Register(new LimitByWorkingSetSize());
        Register(new PrintProcessName());
    }
}

Now, executing this pipeline will execute all three steps, in a streaming fashion.

Okay, this is a lot of code that we can replace with the following snippet:

int maxSizeBytes = 50 * 1024 * 1024;
foreach (Process process in Process.GetProcesses())
{
     if (process.WorkingSet64 > maxSizeBytes)
         System.Console.WriteLine(process.ProcessName);
}

What are we getting from this?

Composability and streaming. When we execute the pipeline, we are not executing each step in turn, we are executing them all in parallel. (Well, not in parallel, but together.)

Hey, I didn't show you how the Pipeline<T> was implemented, right?

public class Pipeline<T>
{
    private readonly List<IOperation<T>> operations = new List<IOperation<T>>();

    public Pipeline<T> Register(IOperation<T> operation)
    {
        operations.Add(operation);
        return this;
    }

    public void Execute()
    {
        IEnumerable<T> current = new List<T>();
        foreach (IOperation<T> operation in operations)
        {
            current = operation.Execute(current);
        }
        IEnumerator<T> enumerator = current.GetEnumerator();
        while (enumerator.MoveNext()) ;
    }
}

I'll leave you to ponder that.

FUTURE POSTS

  1. Partial writes, IO_Uring and safety - about one day from now
  2. Configuration values & Escape hatches - 5 days from now
  3. What happens when a sparse file allocation fails? - 7 days from now
  4. NTFS has an emergency stash of disk space - 9 days from now
  5. Challenge: Giving file system developer ulcer - 12 days from now

And 4 more posts are pending...

There are posts all the way to Feb 17, 2025

RECENT SERIES

  1. Challenge (77):
    20 Jan 2025 - What does this code do?
  2. Answer (13):
    22 Jan 2025 - What does this code do?
  3. Production post-mortem (2):
    17 Jan 2025 - Inspecting ourselves to death
  4. Performance discovery (2):
    10 Jan 2025 - IOPS vs. IOPS
View all series

Syndication

Main feed Feed Stats
Comments feed   Comments Feed Stats
}