Ayende @ Rahien

It's a girl

TPL: Composing tasks

What happens when you want to compose two distinct async operations into a single Task?

For example, let us imagine that we want to have a method that looks like this:

public void ConnectToServer()
{
    var connection = ServerConnection.CreateServerConnection(); // tcp connect
    connection.HandShakeWithServer(); // application level handshake
}

Now, we want to make this method async, using TPL. We can do this by changing the methods to return Task, so the API we have now is:

Task<ServerConnection> CreateServerConnectionAsync();
Task HandShakeWithServerAsync(); // instance method on ServerConnection

And we can now write the code like this:

public Task ConnectToServerAsync()
{
   return ServerConnection.CreateServerConnectionAsymc()
                 .ContinueWith(task => task.Result.HandShakeWithServerAsync());
}

There is just one problem with this approach, the task that we are returning is the first task, because the second task cannot be called as a chained ContinueWith.

We are actually returning a Task<Task>, so we can use task.Result.Result to wait for the final operation, but that seems like a very awkward API.

The challenge is figuring a way to compose those two operations in a way that expose only a single task.

Comments

Ayende Rahien
11/16/2010 08:20 AM by
Ayende Rahien

KAE,

Damn it!

I was sure that I had such an elegant solution, and then you come up with this beauty.

Thanks!

Jes&#250;s L&#243;pez
11/16/2010 09:05 AM by
Jesús López

However there is a subtle problem. If CreateServerConnection fails you will end up with an unobserved exception which will cause the entire app domain to crash if you don't catch unobserved exception in your app domain.

I use the following task extensions for composing tasks and observe antecedents:

    public static Task

<tresult ObservedContinueWith <tresult(this Task task, Func <task,> continuationFunction, TaskContinuationOptions continuationOptions)

    {

        return task.ContinueWith(t =>

        {

            if (t.Status == TaskStatus.Faulted)

            {

                throw t.Exception;

            }

            return continuationFunction(t);

        }, continuationOptions);

    }
Ayende Rahien
11/16/2010 09:11 AM by
Ayende Rahien

Jesus,

What are you talking about?

Exceptions inside tasks will not kill the app domain

Jes&#250;s L&#243;pez
11/16/2010 09:35 AM by
Jesús López

Yes it will. Try this code and see how the console application crashes:

    static void Main(string[] args)

    {

        try

        {

            var t = GetComposedTask();

            Thread.Sleep(1000);

            t.Wait();

            GC.Collect();

            GC.WaitForPendingFinalizers();

            Console.WriteLine("Press enter");

        }

        catch (Exception ex)

        {

            Console.WriteLine("Caugth exception {0}", ex.Message);

        }

    }


    static Task GetComposedTask()

    {

        return Task.Factory.StartNew(() => { throw new Exception("Some error ocurred"); })

            .ContinueWith( _ => { Console.WriteLine("Hello from task"); });



    }
Ayende Rahien
11/16/2010 09:43 AM by
Ayende Rahien

Jesus,

Thanks.

The actual exception is very informative:

A Task's exception(s) were not observed either by Waiting on the Task or accessing its Exception property. As a result, the unobserved exception was rethrown by the finalizer thread.

Which makes total sense.

Jes&#250;s L&#243;pez
11/16/2010 09:47 AM by
Jesús López

I forgot to include Console.ReadLine after Console.WriteLine. Anyway, it crashes

Omer Mor
11/16/2010 07:05 PM by
Omer Mor

If you'd go the Rx path, composition is trivial:

    Task CreateServerConnectionAsync()

    {

        return (from sc in ServerConnection.CreateServerConnectionAsync().ToObservable()

                select sc.HandShakeWithServerAsync()).Single();

    }

Instead of a Task, you can return an observable of unit (sort of void), and have more native Rx signature:

    IObservable

<unit CreateServerConnectionAsync()

    {

        return from sc in ServerConnection.CreateServerConnectionAsync().ToObservable()

               select sc.HandShakeWithServerAsync().ToObservable();

    }

If your whole API will return observables, you could also skip the .ToObservable() task conversions.

Dan Jasek
11/16/2010 08:30 PM by
Dan Jasek

I think Omer has the best solution. I have been underwhelmed by the implementation of Task. Cold tasks are basically useless, they forgot to make an ITask... And now Jesus' exception handling issue.

RX doesn't have any of these problems, including the need to unwrap. It is just better thought out.

I hope they add some sort of iterative await for RX before they release 5.0.

Jeff Cyr
11/16/2010 11:24 PM by
Jeff Cyr

@jesus

Note that you can prevent the application crash by subscribing to TaskScheduler.UnobservedTaskException:

        TaskScheduler.UnobservedTaskException += (sender, e) =>

        {

            Console.WriteLine(e.Exception.ToString());

            e.SetObserved();

        };
Jonas
11/17/2010 07:16 AM by
Jonas

Use a TaskCompletionSource and set up your method to return you tc.Task and then wrap your async methods neatly to call tc.TrySetException | TrySetResult when they complete..

Tada...

David Hanson
11/19/2010 06:19 AM by
David Hanson

I "Subscribe" ;-) to the Rx approach.

We use the approach outlined by Omar to load data from mutliple domains services in Ria and works very well.

Whut
11/22/2010 10:11 AM by
Whut

I'm definitely missing something, but why don't you just put CreateServerConnection and HandShakeWithServer inside one task?

Comments have been closed on this topic.