Ayende @ Rahien

Refunds available at head office

Beware of big Task Parallel Library Operations

Take a look at the following code:

class Program
{
    static void Main()
    {
        var list = Enumerable.Range(0, 10 * 1000).ToList();

        var task = ProcessList(list, 0);


        Console.WriteLine(task.Result);

    }

    private static Task<int> ProcessList(List<int> list, int pos, int acc = 0)
    {
        if (pos >= list.Count)
        {
            var tcs = new TaskCompletionSource<int>();
            tcs.TrySetResult(acc);
            return tcs.Task;
        }

        return Task.Factory.StartNew(() => list[pos] + acc)
            .ContinueWith(task => ProcessList(list, pos + 1, task.Result))
            .Unwrap();
    }
}

This is a fairly standard piece of code, which does a “complex” async process and then move on. It is important in this case to do the operation in the order they were given, and the real code is actually doing something that need to be async (go and fetch some data from a remote server).

It is probably easier to figure out what is going on when you look at the C# 5.0 code:

class Program
{
    static void Main()
    {
        var list = Enumerable.Range(0, 10 * 1000).ToList();

        var task = ProcessList(list, 0);

        Console.WriteLine(task.Result);

    }

    private async static Task<int> ProcessList(List<int> list, int pos, int acc = 0)
    {
        if (pos >= list.Count)
        {
            return acc;
        }

        var result = await Task.Factory.StartNew(() => list[pos] + acc);

        return await ProcessList(list, pos + 1, result);
    }
}

I played with user mode scheduling in .NET a few times in the past, and one of the things that I was never able to resolve properly was the issue of the stack depth. I hoped that the TPL would resolve it, but it appears that it didn’t. Both code samples here will throw StackOverFlowException when run.

It sucks, quite frankly. I understand why this is done this way, but I am quite annoyed by this. I expected this to be solved somehow. Using C# 5.0, I know how to solve this:

class Program
{
    static void Main()
    {
        var list = Enumerable.Range(0, 10 * 1000).ToList();

        var task = ProcessList(list);

        Console.WriteLine(task.Result);

    }

    private async static Task<int> ProcessList(List<int> list)
    {
        var acc = 0;
        foreach (var i in list)
        {
            var currentAcc = acc;
            acc += await Task.Factory.StartNew(() => i + currentAcc);
        }
        return acc;
    }
}

The major problem is that I am not sure how to translate this code to C# 4.0. Any ideas?

Comments

configurator
04/12/2012 02:06 PM by
configurator

You'd have to do a similar translation to the one done by syntactic sugar in C# 5, which is quite a complicated translation. I'd just look at what a decompiler outputs and base my code on that, probably.

josh
04/12/2012 02:20 PM by
josh

Is that last method valid? i.e. is there an implicit conversion from int to Task?

AlfeG
04/12/2012 02:20 PM by
AlfeG

Maybe this can help? http://www.simple-talk.com/community/blogs/alex/archive/2012/04/08/107197.aspx

gandjustas
04/12/2012 02:36 PM by
gandjustas

Same as С# 5 compiler translates this code.

Rewrite loop as state machine, create object for loop variables, pass this object as function parameter to continuation.

May be Parallel Linq solves your problem? Or use iterarator with code from PXX: http://code.msdn.microsoft.com/ParExtSamples Or use RX

Geert Baeyaert
04/12/2012 02:43 PM by
Geert Baeyaert

I've just tried this on my machine with a list of one million ints, and I don't get a StackOverflowException.

josh
04/12/2012 02:44 PM by
josh

Ayende,

Perhaps I'm missing something about why it's necessary to have more than one task here? Can you explain that a little?

The following acheives the result you're after

    private static Task<int> ProcessList(IEnumerable<int> list)
    {
        var task = Task.Factory.StartNew(
            () => list.Aggregate(0, (acc, i) => acc + i));
        return task;
    }

email omitted as it's night here and I don't want the beeps of auto-replies on my phone.

Ryan
04/12/2012 03:10 PM by
Ryan

You can actually use the TPL functions that came with .NET 4.0 to process this sequentially.

private static Task ProcessList(List list, int pos, int acc = 0) { if (pos >= list.Count) { var tcs = new TaskCompletionSource(); tcs.TrySetResult(acc); return tcs.Task; }

return Task.Factory.StartNew(() => list.AsParallel().AsSequential().Skip(pos).Sum(i => i) + acc);

}

Ryan
04/12/2012 03:11 PM by
Ryan

Man my formatting got removed

private static Task ProcessList(List list, int pos, int acc = 0) { if (pos >= list.Count) { var tcs = new TaskCompletionSource(); tcs.TrySetResult(acc); return tcs.Task; }

return Task.Factory.StartNew(() => list.AsParallel().AsSequential().Skip(pos).Sum(i => i) + acc);

}

Ryan
04/12/2012 03:12 PM by
Ryan

Ok i give up

Aaron
04/12/2012 03:18 PM by
Aaron

you could try the async bridge https://nuget.org/packages/AsyncBridge If you can compile with vs 2011

Karg
04/12/2012 03:23 PM by
Karg

Don't forget that he said the actual work being done is async remote calls, not just summing numbers as in the simple example.

State machine is the way to replicate the 5.0 code

Omer Mor
04/12/2012 03:25 PM by
Omer Mor

First, you can use the C# 5 compiler with .NET 4.0. Alex Davies and I adapted some code written by Daniel Grunwald into a library called AsyncBridge. It's similar to LinqBridge in how it allows you to use the new async features with .NET 4 & .NET 3.5. You can get the nuget (https://nuget.org/packages/AsyncBridge) or the source (https://github.com/OmerMor/AsyncBridge).

But more specific to your problem, I'd write it using Rx:

var result = await Enumerable .Range(0, 10 * 1000) .ToObservable() .ObserveOn(Scheduler.TaskPool) .Select(i => fetchDataFromRemoteServerFor(i)) .Sum(); // or use more complex aggregations with .Aggregate(...)

Omer Mor
04/12/2012 03:30 PM by
Omer Mor

Oren, you should fix the formatting for the comments in your blog. you should probably use markdown syntax with preview - after all it is a programming blog and comments often contain code.

Omer Mor
04/12/2012 03:31 PM by
Omer Mor

Here's another attempt at formatting the Rx query:

var result = await Enumerable

.Range(0, 10 * 1000)

.ToObservable()

.ObserveOn(Scheduler.TaskPool)

.Select(i => fetchDataFromRemoteServerFor(i))

.Sum(); // or use more complex aggregations with .Aggregate(...)

tobi
04/12/2012 03:32 PM by
tobi

Ayende,

the first one does not throw for me. The TPL has stack depth checks.

Can't test the second one but I suspect it won't throw either. RunSynchronously gets ignored if the stack is too deep.

tobi
04/12/2012 03:45 PM by
tobi

Ayende,

actually I'd like to repro this because I have such code in production. Under what circumstances will this cause a StackOverflow? I tried x86/x64 cross-product debug/(release without debugger) with 1000 * 1000. Works correctly.

Ayende Rahien
04/12/2012 04:49 PM by
Ayende Rahien

gandjustas , that is awesome, but requires VS 2011 compiler

Ayende Rahien
04/12/2012 04:49 PM by
Ayende Rahien

Josh, Yes, when you are using an async method.

Ayende Rahien
04/12/2012 04:50 PM by
Ayende Rahien

Josh, Assume that each of the operation is a separate async task. For example, an async web request

Ayende Rahien
04/12/2012 04:51 PM by
Ayende Rahien

Ryan, Except that in my case, I actually need to execute an OPERATION in sequence . I.e: Making a lot of web requests

Ayende Rahien
04/12/2012 04:51 PM by
Ayende Rahien

Aaron, No, I can't do that.

Ayende Rahien
04/12/2012 04:52 PM by
Ayende Rahien

Karg, My problem is, I have no idea how to do the waiting for the task inside my own task without actually holding up the thread.

Ayende Rahien
04/12/2012 04:53 PM by
Ayende Rahien

Omer, The actual operation I want to do is to run an async task (WebRequest.GetStringAsync()). I don't see how this would make it work, and I have to run this ONE at at time, in the same sequence.

Ayende Rahien
04/12/2012 04:55 PM by
Ayende Rahien

tobi, This is the code using a Console Application, .NET Client Profile 4.0 with x86. It is possible that you need to replace the math stuff with an actual blocking call, like a web request to reproduce that.

Bunter
04/12/2012 07:42 PM by
Bunter

Do I get it correctly: you are trying to implement a use case with task parallel library which inherently is not parallel.

Duarte
04/12/2012 08:34 PM by
Duarte

If you're using ContinueWith with the default TaskContinuationOptions, then there really should be no recursion, as the continuation will execute on a different thread (or the current one, after it returns and the stack unwinds). At least for the "server" profile they solve the recursion issue by using stack probes (via the Win32 api).

Sam
04/12/2012 09:32 PM by
Sam

As far as I can tell, it is possible for the first method to run without throwing StackOverFlowException most of the time.

Doesn't Task.StartNew have an optimization that will run the body immediately if it's small? This would cause the StackOverflowException.

If the body were larger, then it would be captured into a lambda and run on the Threadpool, which should make it not throw the exception.

If that is the case, adding TaskCreationOptions.PreferFairness should force it to always run on the Threadpool.

The TPL CTP had AttachToParent as the default option, which caused a similar problem.

gandjustas
04/12/2012 11:46 PM by
gandjustas

TaskCreationOptions.PreferFairness dosen't solve problem.

await Task.Factory.StartNew(method) almost always leads to "task inlining", when task completes on same thread with parent task.

If you need real async call - use Task.Factory.FromAsync with APM or TaskCompletionSource with EAP/

Ayende Rahien
04/12/2012 07:50 PM by
Ayende Rahien

Bunter, What I am trying to do is to create an async process in which I do a set of async operation in seqeunce.

Patrick Huizinga
04/12/2012 09:36 PM by
Patrick Huizinga

would this do the trick? (not tested, except in my head)

class Program { static void Main() { var list = Enumerable.Range(0, 10000).ToList(); var task = new ProcessList(list).Task; Console.WriteLine(task.Result); }

sealed class ProcessList { readonly List{int} list; readonly TaskCompletionSource{int} result = new TaskCompletionSource{int}(); int index, acc;

public ProcessList(List{int} list)
{
  this.list = list;
  MoveNext();
}

public Task{int} Task { get { return result.Task; } }

void MoveNext()
{
  if (list.Count == index) result.TrySetResult(acc);
  else Task.Factory.StartNew(ProcessStep);
}

void ProcessStep()
{
  try
  {
    acc += list[index];
    index++;
    MoveNext();
  }
  catch (Exception e)
  {
    result.TrySetException(e);
  }
}

} }

Patrick Huizinga
04/12/2012 09:45 PM by
Patrick Huizinga

@Ayende: Karg, My problem is, I have no idea how to do the waiting for the task inside my own task without actually holding up the thread.

wouldn't a combination of TaskCompletionSource and Task.ContinueWith() do the trick?

so basically:

var source = new TaskCompletionSource GetTask().ContinueWith(t => source.TrySetX(...)) return source.Task

inside the ContinueWith you can do whatever you want

Ayende Rahien
04/12/2012 09:46 PM by
Ayende Rahien

Patrick, YES, that would do it. I never considered just starting up a new task, instead of using ContinueWith. Very good point.

Ayende Rahien
04/12/2012 09:46 PM by
Ayende Rahien

Patrick, If you use ConitnueWith, that is going to cause the StackOverflow, without it, it works.

Patrick Huizinga
04/12/2012 09:51 PM by
Patrick Huizinga

Hmm, can it be that in order for you code to show properly you need to have 2 newlines and indent it all with 4 spaces?

Just like StackOverflow, could've guessed that...

Bunter
04/13/2012 12:52 AM by
Bunter

Since you need syncronization anyway (calling operations in given sequence), why bother with async task for every single operation when you could just have single async task doing a loop and blocking?

josh
04/13/2012 01:08 AM by
josh

So, how about this?

    private static Task<int> ProcessList(IEnumerable<int> list)
    {

        var task = Task.Factory.StartNew(
            () =>
                {
                    var acc = 0;
                    foreach (int item in list)
                        acc += int.Parse(GetStringAsync(item).Result);
                    return acc;
                });
        return task;
    }

    private static Task<string> GetStringAsync(int item)
    {
        return Task.Factory.StartNew(() => item.ToString());
    }
gandjustas
04/13/2012 02:17 AM by
gandjustas

Stackoverflow in this case is not caused by task calls with , it caused by task completion.

    private async static Task<int> ProcessList(List<int> list, int pos, int acc = 0)
    {
        if (pos >= list.Count)
        {
            return acc; //(1)
        }

        var result = await Task.Factory.StartNew(() => list[pos] + acc);

        return await ProcessList(list, pos + 1, result);
    }

Task returned by ProcessList don't complete until all inner task completes. Recursive calls creates nested task on heap, without stakoverflow. When execution reaches line (1) innermost task completes, and it caused parent task to complete synchronous (adding about 5 calls in stack).

This "task unwind" caused StackOverflow..

Continuations should be chained, not nested. TPL is very bad designed in this way. TPL should be considered as low-level async library. For production code prefer RX.

gandjustas
04/13/2012 02:32 AM by
gandjustas

Code to chaining (not nesting) tasks:

    private static Task<int> ProcessList(List<int> list, int pos, int acc = 0)
    {
        var resultTcs = new TaskCompletionSource<int>();

        if (pos >= list.Count)
        {
            resultTcs.TrySetResult(acc);

        }
        else
        {
            Task.Factory.StartNew(() => list[pos] + acc).ContinueWith(task =>
                {
                    if (task.IsCanceled)
                    {
                        resultTcs.TrySetCanceled();
                    }
                    else if (task.IsFaulted)
                    {
                        resultTcs.TrySetException(task.Exception.InnerExceptions);
                    }
                    else
                    {
                        ProcessList(list, pos + 1, task.Result).ContinueWith(loop =>
                            {

                                if (loop.IsCanceled)
                                {
                                    resultTcs.TrySetCanceled();
                                }
                                else if (loop.IsFaulted)
                                {
                                    resultTcs.TrySetException(task.Exception.InnerExceptions);
                                }
                                else
                                {
                                    resultTcs.TrySetResult(loop.Result);
                                }
                            }, TaskContinuationOptions.ExecuteSynchronously);


                    }

                }, TaskContinuationOptions.ExecuteSynchronously);
        }

        return resultTcs.Task;
    }
Ayende Rahien
04/13/2012 04:11 AM by
Ayende Rahien

Bunter, During the async operation, I can give up my own thread and do something else in it.

Bunter
04/13/2012 01:29 PM by
Bunter

I still fail to grasp the difference - once you've given out the ball (instructions to run a task with a loop and bunch of calls) to TPL, how is it different for your main thread how this async task executes?

Ayende Rahien
04/14/2012 12:14 AM by
Ayende Rahien

Bunter, Let us say that I have only 2 threads in my system. If I am holding one waiting for an async operation, that is going to cause a lot of waiting, and will holdup the entire thread. If I am using async operations, I am NOT holding a thread while the OS is doing the actual IO

Omer Mor
04/14/2012 11:14 AM by
Omer Mor

Oren, regarding Rx: with the latest v2.0 beta version you can write your query like this:

var result = await Observable

    .Range(0, 10 * 1000)

    .ObserveOn(TaskPoolScheduler.Default)

    .Select(i => Observable.FromAsync(() => fetchDataAsync(i)))

    .Concat()

    .Sum();

This query will run the async operation one at a time, in the right order. For explanation - see this thread in the Rx forum (where I posted your question): http://social.msdn.microsoft.com/Forums/en-US/rx/thread/f7171f3b-523a-4df7-a134-d7bf633cdc55.

I generalized your case and wrote a simple SelectAsync operator:

public IObservable<R> SelectAsync<T,R>(IObservable<T> source, Func<T,Task<R>> asyncSelector)

{

    return source

        .Select(value => Observable.FromAsync(() => asyncSelector(value)))

        .Concat();

}

This allow the following simple query:

var result = await Observable

    .Range(0, 10 * 1000)

    .ObserveOn(TaskPoolScheduler.Default)

    .SelectAsync(i => fetchDataAsync(i))

    .Sum();
gandjustas
04/14/2012 04:12 PM by
gandjustas

Bunter, for caller there is no difference how asynchronous call executes. But for whole system creation of new threads is very expensive operation. Async IO helps handle massive coucurrency with small number of threads.

Async IO allows you to create scalable systems.

Comments have been closed on this topic.