Ayende @ Rahien

Refunds available at head office

Challenge: The problem of locking down tasks…

The following code has very subtle bug:

   1: public class AsyncQueue
   2: {
   3:     private readonly Queue<int> items = new Queue<int>();
   4:     private volatile LinkedList<TaskCompletionSource<object>> waiters = new LinkedList<TaskCompletionSource<object>>();
   5:  
   6:     public void Enqueue(int i)
   7:     {
   8:         lock (items)
   9:         {
  10:             items.Enqueue(i);
  11:             while (waiters.First != null)
  12:             {
  13:                 waiters.First.Value.TrySetResult(null);
  14:                 waiters.RemoveFirst();
  15:             }
  16:         }
  17:     }
  18:  
  19:     public async Task<IEnumerable<int>> DrainAsync()
  20:     {
  21:         while (true)
  22:         {
  23:             TaskCompletionSource<object> taskCompletionSource;
  24:             lock (items)
  25:             {
  26:                 if (items.Count > 0)
  27:                 {
  28:                     return YieldAllItems();
  29:                 }
  30:                 taskCompletionSource = new TaskCompletionSource<object>();
  31:                 waiters.AddLast(taskCompletionSource);
  32:             }
  33:             await taskCompletionSource.Task;
  34:         }
  35:     }
  36:  
  37:     private IEnumerable<int> YieldAllItems()
  38:     {
  39:         while (items.Count > 0)
  40:         {
  41:             yield return items.Dequeue();
  42:         }
  43:  
  44:     }
  45: }

I’ll even give you a hint, try to run the following client code:

   1: for (int i = 0; i < 1000 * 1000; i++)
   2: {
   3:     q.Enqueue(i);
   4:     if (i%100 == 0)
   5:     {
   6:         Task.Factory.StartNew(async () =>
   7:             {
   8:                 foreach (var result in await q.DrainAsync())
   9:                 {
  10:                     Console.WriteLine(result);
  11:                 }
  12:             });
  13:     }
  14:  
  15: }
Can you figure out what the problem is?

Comments

tobi
05/27/2013 09:52 AM by
tobi

The enumeration happens without the lock being held.

Duarte Nunes
05/27/2013 11:32 AM by
Duarte Nunes

The problem seems to be that you are returning an iterator, which won't actually consume the items until it's iterated over - and when it happens, the items collection won't be locked.

(The waiters doesn't need to be volatile; maybe you meant readonly?)

Cristi Ursachi
05/27/2013 12:44 PM by
Cristi Ursachi

I think in general is best if we avoid using lock in an async method "async way" of locking. A solution would be:

private static readonly SemaphoreSlim semaphoreSlim = new SemaphoreSlim(initialCount: 1);

public async Task<IEnumerable> DrainAsync() { while (true) { TaskCompletionSource taskCompletionSource;

        await semaphoreSlim.WaitAsync(); 
     try
        {
              if (items.Count > 0)
            {
                 return YieldAllItems();
             }
              taskCompletionSource = new TaskCompletionSource<object>();
           waiters.AddLast(taskCompletionSource);

          await taskCompletionSource.Task;
    }

finally{semaphoreSlim.Release();} } }

hangy
05/27/2013 12:53 PM by
hangy

Also, if locking is required, I would probably not lock on the items queue directly, but on ((ICollection)items).SyncRoot.

marinehero
05/27/2013 05:19 PM by
marinehero

Surely we need to pass the current Task state on to the TaskCompletionSource in it's constructor in DrainAsync() ?

change

taskCompletionSource = new TaskCompletionSource(this);

Alois Kraus
05/27/2013 05:49 PM by
Alois Kraus

You are returning an iterator which does drain the queue without taking a lock. The check for the count > 0 followed by a deque will lead to an InvalidOperationException telling you that the queue is empty. The lock in DrainAsync is immediately released when you return it.

Erik
06/07/2013 01:18 PM by
Erik

I think the simplest way to resolve this is to simply add a .ToArray() call to the end of return YieldAllItems(); - this will consume the enumerable and drain the queue inside the lock as desired.

Comments have been closed on this topic.