Ayende @ Rahien

Refunds available at head office

Aysnc Read Challenge

Originally posted at 11/11/2010

Here is how you are supposed to read from a stream:

var buffer = new byte[bufferSize];
int read = -1;
int start = 0;
while(read != 0)
{
   read = stream.Read(buffer, start, buffer.Length - start);
   start += read;
}

The reason that we do it this way is that the stream might not have all the data available for us, and might break the read requests midway.

The question is how to do this in an asynchronous manner. Asynchronous loops are… tough, to say the least. Mostly because you have to handle the state explicitly.

Here is how you can do this using the new Async CTP in C# 5.0:

private async static Task<Tuple<byte[], int>> ReadBuffer(Stream s, int bufferSize)
{
    var buffer = new byte[bufferSize];
    int read = -1;
    int start = 0;
    while (read != 0)
    {
        read = await Task.Factory.FromAsync<byte[],int,int, int>(
            s.BeginRead, 
            s.EndRead, 
            buffer, 
            start, 
            buffer.Length -1, 
            null);
        start += read;
    }
    return Tuple.Create(buffer, start);
}

Now, what I want to see is using just the TPL API, and without C# 5.0 features, can you write the same thing?

Comments

Duarte Nunes
11/14/2010 12:19 PM by
Duarte Nunes

public static Task <tuple<byte[],>

ReadBuffer(Stream s, int bufferSize)

{

var buffer = new byte[bufferSize];

int start = 0;


var tcs = new TaskCompletionSource

<tuple<byte[],>

();

Action

<task cont = null;

cont = t =>

{

    if (t != null)

    {

        if (t.Exception != null)

        {

            tcs.SetException(t.Exception);

            return;

        }

        if (t.Result == 0)

        {

            tcs.SetResult(Tuple.Create(buffer, start));

            return;

        }


        start += t.Result;

    }


    Task.Factory.FromAsync

<byte[],> (

    s.BeginRead,

    s.EndRead,

    buffer,

    start,

    buffer.Length - 1,

    null).ContinueWith(cont);

};


cont(null);


return tcs.Task;

}

Daniel Grunwald
11/14/2010 12:32 PM by
Daniel Grunwald

I've had to do something like that years ago.

I don't have that code anymore, but I think my solution looked similar to this (sorry, no TPL there):

class BufferReader

{

byte[] buffer;

int pos;

Stream stream;

Exception exception;


public BufferReader(Stream stream, int bufferSize)

{

    this.stream = stream;

    this.buffer = new byte[bufferSize];

}


public void BeginRead()

{

    stream.BeginRead(buffer, pos, buffer.Length - pos, Callback, null);

}


void Callback(IAsyncResult r)

{

    int read;

    try {

        read = stream.EndRead(r);

    } catch (Exception ex) {

        exception = ex;

        if (ReadError != null)

            ReadError(this, EventArgs.Empty);

        return;

    }

    if (read == 0) {

        if (FinishedReading != null)

            FinishedReading(this, EventArgs.Empty);

    } else {

        pos += read;

        BeginRead();

    }

}


public event EventHandler FinishedReading;

public event EventHandler ReadError;


// + properties that allow the caller to access buffer,pos,exception

}

Usage of that class: create an instance, attach event handlers, then call BeginRead().

BTW, your code is wrong, you should use «buffer.Length - start» instead of «buffer.Length - 1».

g
11/14/2010 04:19 PM by
g

The C# 5.0 async usage looks so ... ugly to me. I cannot articulate why though, it just seems rather esoteric.

anon
11/14/2010 10:00 PM by
anon

search for

"Sequential Asynchronous Workflows in Silverlight using Coroutines"

or

"Rob Eisenburg's MVVM presentation from MIX10" which introduces that idea.

Ollie
11/14/2010 10:17 PM by
Ollie

+1 Omar

Paul Stovell
11/15/2010 12:40 AM by
Paul Stovell

ThreadPool.QueueUserWorkItem(

() => {

  // The code you wrote above ;)

});

Except you won't get the benefits of IOCP.

Ajai Shankar
11/15/2010 02:05 AM by
Ajai Shankar

You might want to look at this too Ayende

http://easyasync.codeplex.com/

This was inspired over 6 years back from the good old state threads library.

Now we have async CTP, node.js, torando all dealing with non blocking io...

Also do see the links at bottom...

Mihailo
11/15/2010 04:27 PM by
Mihailo

I might be wrong but isn't C#5 version pretty much the same as:

var buffer = new byte[bufferSize];

int read = -1;

int start = 0;

while(read != 0)

{

var ar = stream.BeginRead(buffer, start, buffer.Length - start, null, null);

read = stream.EndRead(ar)

start += read;

}

Code will block on await, here it will block on EndRead, no?

Kiran
11/15/2010 06:50 PM by
Kiran

Using Jeff Richter's AsyncEnumerator class

void Main()

{

var ae = new AsyncEnumerator();

ae.EndExecute(ae.BeginExecute

                (ProcessFile(ae, @"filepath", 8192), null));

}

private static IEnumerator <int32 ProcessFile(AsyncEnumerator ae, string pathName, Int32 bufferSize)

{

using(FileStream fs = 

        new FileStream(pathName, FileMode.Open, FileAccess.Read, 

        FileShare.Read, bufferSize, FileOptions.Asynchronous))

{

    Byte[] data = new Byte[fs.Length];

    fs.BeginRead(data, 0, data.Length, ae.End(), null);

    yield return 1;



    fs.EndRead(ae.DequeueAsyncResult());

    Console.WriteLine(UTF8Encoding.UTF8.GetString(data));

}

}

Harry Steinhilber
11/15/2010 10:10 PM by
Harry Steinhilber

@Mihailo,

The C#5 code will not block on await. If a result is immediately available, it will continue, but if it is not, it signs the rest of the method up in a continuation and returns to the caller. The continuation will be called when the result becomes available. See Eric Lippert's excellent series of explanations.

blogs.msdn.com/.../c2300+5-0/

Mihailo
11/16/2010 11:17 AM by
Mihailo

@Harry Steinhilber

Thanks for that, second installment should have explained it but it doesn't work for me. It explains how it's made but it becomes too technical for my level of knowledge of C#5 features. If you look at Jon Skeet's post:

msmvps.com/.../...-investigating-control-flow.aspx

He explains it like to an 8 year old. And yes, you're correct, it's a bit different, but to the caller of the ReadBuffer method. The part of the code I laid out is a bit of oversimplification and to work the same way it would with async and awit it needs collaboration from method caller.

I believe this code will have the same effect as C#5 version:

private static Tuple <byte[],> ReadBuffer(Stream s, int bufferSize)

var buffer = new byte[bufferSize];

int read = -1;

int start = 0;

while(read != 0)

{

var ar = stream.BeginRead(buffer, start, buffer.Length - start, null, null);

read = stream.EndRead(ar)

start += read;

}

return Tuple.Create(buffer, start);

}

Func <stream,>

readBuffer = (s, bufferSize) => ReadBuffer(s, bufferSize);

var ar = readBuffer.BeginInvoke(null, null);

... do something ...

var data = readBuffer.EndInvoke(ar);

I understand this seems to be uglier, especially for the caller, but that's probably reason why the new language features are introduced. Also, I understand this code is not 1-to-1 equivalent, I'm just saying it will have the same effect for the example given by Oren.

Demis Bellot
11/16/2010 12:29 PM by
Demis Bellot

@Mihailo

I think you're missing the point. The C# 5.0 async features executes in the same thread, calling BeginInvoke/EndInvoke execute's the logic asynchronously on a ThreadPool thread.

This is a very important difference and is the main purpose of async/await language features which allows you to easily develop composable async / non-blocking logic on the same thread.

As soon as you introduce threading you introduce concurrency issues as well as additional overhead of context switching between threads.

  • Demis
Mihailo
11/16/2010 02:50 PM by
Mihailo

@Demis Bellot

I'm not arguing that async/await isn't different, or better, that's why I said it will have nearly the same effect - in this particular example - not in general.

Mind though that you may still have code executed on a different thread, you may still have ThreadPool thread in background (or something similar). How do you think continuation is executed? How do you think callbacks are executed? Language additions are not making multi-threading magically disappear, they are just making it easier for us to develop and manage it.

Demis Bellot
11/16/2010 03:28 PM by
Demis Bellot

@Mihailo

Mind though that you may still have code executed on a different thread, you may still have ThreadPool thread in background (or something similar).

This is incorrect and is the main point of C# 5.0 async/await. No secret background thread spawned to achieve the behaviour. It's all done by the compiler wrapping your proceeding logic in a callback which is executed as part of a generated state machine (similar to how yield works):

"Whenever a task is "awaited", the remainder of the current method is signed up as a continuation of the task, and then control immediately returns to the caller. When the task completes, the continuation is invoked and the method starts up where it was before."

So no background threads - all your logic is executed on the same thread.

I recommend the following post for some more detailed info:

blogs.msdn.com/.../asynchrony-in-c-5-part-one.aspx

As well as Anders talk on the subject:

player.microsoftpdc.com/.../1b127a7d-300e-4385-...

  • Demis
Mihailo
11/16/2010 07:20 PM by
Mihailo

@Demis

We might be talking about different things here. Default implementation of Stream.BeginRead will execute:

return delegate2.BeginInvoke(buffer, offset, count, callback, state);

That's output from Reflector, so there will be new thread involved.

I'm just guessing that what you're referring to is my line:

var ar = readBuffer.BeginInvoke(null, null);

Compiler will not create a new thread like I did there.

Demis Bellot
11/16/2010 08:22 PM by
Demis Bellot

Hi @Mihailo

I was just saying that calling a delegate concurrently like this:

var ar = readBuffer.BeginInvoke(null, null);

Executes this on a ThreadPool thread.

Jeff Cyr
11/16/2010 11:11 PM by
Jeff Cyr

Here is a version that use the Task Iterator provided in the TPL Samples:

http://code.msdn.microsoft.com/ParExtSamples

    public static Task

<byte[]> ReadAsync(Stream stream, int length)

    {

        byte[] buffer = new byte[length];


        return Task.Factory.Iterate(ReadBufferIterator(stream, buffer, length))

                           .ContinueWith(_ => buffer);

    }


    private static IEnumerable

<task ReadBufferIterator(Stream stream, byte[] buffer, int length)

    {

        int offset = 0;


        while (offset != length)

        {

            Task

<int readTask = stream.ReadAsync(buffer, offset, length - offset);

            yield return readTask;

            int readBytes = readTask.Result;


            if (readBytes == 0)

                throw new IOException("Read 0 byte. The stream has been closed.");


            offset += readBytes;

        }

    }


    public static class StreamEx

    {

        public static Task

<int ReadAsync(this Stream stream, byte[] buffer, int offset, int length)

        {

            return Task

<int.Factory.FromAsync(stream.BeginRead, stream.EndRead, buffer, offset, length, null);

        }

    }
Comments have been closed on this topic.