Ayende @ Rahien

Hi!
My name is Oren Eini
Founder of Hibernating Rhinos LTD and RavenDB.
You can reach me by phone or email:

ayende@ayende.com

+972 52-548-6969

, @ Q c

Posts: 5,972 | Comments: 44,508

filter by tags archive

ChallengeThe problem of locking down tasks…


The following code has very subtle bug:

   1: public class AsyncQueue
   2: {
   3:     private readonly Queue<int> items = new Queue<int>();
   4:     private volatile LinkedList<TaskCompletionSource<object>> waiters = new LinkedList<TaskCompletionSource<object>>();
   5:  
   6:     public void Enqueue(int i)
   7:     {
   8:         lock (items)
   9:         {
  10:             items.Enqueue(i);
  11:             while (waiters.First != null)
  12:             {
  13:                 waiters.First.Value.TrySetResult(null);
  14:                 waiters.RemoveFirst();
  15:             }
  16:         }
  17:     }
  18:  
  19:     public async Task<IEnumerable<int>> DrainAsync()
  20:     {
  21:         while (true)
  22:         {
  23:             TaskCompletionSource<object> taskCompletionSource;
  24:             lock (items)
  25:             {
  26:                 if (items.Count > 0)
  27:                 {
  28:                     return YieldAllItems();
  29:                 }
  30:                 taskCompletionSource = new TaskCompletionSource<object>();
  31:                 waiters.AddLast(taskCompletionSource);
  32:             }
  33:             await taskCompletionSource.Task;
  34:         }
  35:     }
  36:  
  37:     private IEnumerable<int> YieldAllItems()
  38:     {
  39:         while (items.Count > 0)
  40:         {
  41:             yield return items.Dequeue();
  42:         }
  43:  
  44:     }
  45: }

I’ll even give you a hint, try to run the following client code:

   1: for (int i = 0; i < 1000 * 1000; i++)
   2: {
   3:     q.Enqueue(i);
   4:     if (i%100 == 0)
   5:     {
   6:         Task.Factory.StartNew(async () =>
   7:             {
   8:                 foreach (var result in await q.DrainAsync())
   9:                 {
  10:                     Console.WriteLine(result);
  11:                 }
  12:             });
  13:     }
  14:  
  15: }
Can you figure out what the problem is?

More posts in "Challenge" series:

  1. (28 Apr 2015) What is the meaning of this change?
  2. (26 Sep 2013) Spot the bug
  3. (27 May 2013) The problem of locking down tasks…
  4. (17 Oct 2011) Minimum number of round trips
  5. (23 Aug 2011) Recent Comments with Future Posts
  6. (02 Aug 2011) Modifying execution approaches
  7. (29 Apr 2011) Stop the leaks
  8. (23 Dec 2010) This code should never hit production
  9. (17 Dec 2010) Your own ThreadLocal
  10. (03 Dec 2010) Querying relative information with RavenDB
  11. (29 Jun 2010) Find the bug
  12. (23 Jun 2010) Dynamically dynamic
  13. (28 Apr 2010) What killed the application?
  14. (19 Mar 2010) What does this code do?
  15. (04 Mar 2010) Robust enumeration over external code
  16. (16 Feb 2010) Premature optimization, and all of that…
  17. (12 Feb 2010) Efficient querying
  18. (10 Feb 2010) Find the resource leak
  19. (21 Oct 2009) Can you spot the bug?
  20. (18 Oct 2009) Why is this wrong?
  21. (17 Oct 2009) Write the check in comment
  22. (15 Sep 2009) NH Prof Exporting Reports
  23. (02 Sep 2009) The lazy loaded inheritance many to one association OR/M conundrum
  24. (01 Sep 2009) Why isn’t select broken?
  25. (06 Aug 2009) Find the bug fixes
  26. (26 May 2009) Find the bug
  27. (14 May 2009) multi threaded test failure
  28. (11 May 2009) The regex that doesn’t match
  29. (24 Mar 2009) probability based selection
  30. (13 Mar 2009) C# Rewriting
  31. (18 Feb 2009) write a self extracting program
  32. (04 Sep 2008) Don't stop with the first DSL abstraction
  33. (02 Aug 2008) What is the problem?
  34. (28 Jul 2008) What does this code do?
  35. (26 Jul 2008) Find the bug fix
  36. (05 Jul 2008) Find the deadlock
  37. (03 Jul 2008) Find the bug
  38. (02 Jul 2008) What is wrong with this code
  39. (05 Jun 2008) why did the tests fail?
  40. (27 May 2008) Striving for better syntax
  41. (13 Apr 2008) calling generics without the generic type
  42. (12 Apr 2008) The directory tree
  43. (24 Mar 2008) Find the version
  44. (21 Jan 2008) Strongly typing weakly typed code
  45. (28 Jun 2007) Windsor Null Object Dependency Facility

Comments

tobi

The enumeration happens without the lock being held.

Duarte Nunes

The problem seems to be that you are returning an iterator, which won't actually consume the items until it's iterated over - and when it happens, the items collection won't be locked.

(The waiters doesn't need to be volatile; maybe you meant readonly?)

Cristi Ursachi

I think in general is best if we avoid using lock in an async method "async way" of locking. A solution would be:

private static readonly SemaphoreSlim semaphoreSlim = new SemaphoreSlim(initialCount: 1);

public async Task<IEnumerable> DrainAsync() { while (true) { TaskCompletionSource taskCompletionSource;

        await semaphoreSlim.WaitAsync(); 
     try
        {
              if (items.Count > 0)
            {
                 return YieldAllItems();
             }
              taskCompletionSource = new TaskCompletionSource<object>();
           waiters.AddLast(taskCompletionSource);

          await taskCompletionSource.Task;
    }

finally{semaphoreSlim.Release();} } }

hangy

Also, if locking is required, I would probably not lock on the items queue directly, but on ((ICollection)items).SyncRoot.

marinehero

Surely we need to pass the current Task state on to the TaskCompletionSource in it's constructor in DrainAsync() ?

change

taskCompletionSource = new TaskCompletionSource(this);

Alois Kraus

You are returning an iterator which does drain the queue without taking a lock. The check for the count > 0 followed by a deque will lead to an InvalidOperationException telling you that the queue is empty. The lock in DrainAsync is immediately released when you return it.

Erik

I think the simplest way to resolve this is to simply add a .ToArray() call to the end of return YieldAllItems(); - this will consume the enumerable and drain the queue inside the lock as desired.

Comment preview

Comments have been closed on this topic.

FUTURE POSTS

  1. Paying the rent online - 9 hours from now
  2. Reducing parsing costs in RavenDB - about one day from now

There are posts all the way to Aug 04, 2015

RECENT SERIES

  1. Production postmortem (5):
    29 Jul 2015 - The evil licensing code
  2. Career planning (6):
    24 Jul 2015 - The immortal choices aren't
  3. API Design (7):
    20 Jul 2015 - We’ll let the users sort it out
  4. What is new in RavenDB 3.5 (3):
    15 Jul 2015 - Exploring data in the dark
  5. The RavenDB Comic Strip (3):
    28 May 2015 - Part III – High availability & sleeping soundly
View all series

Syndication

Main feed Feed Stats
Comments feed   Comments Feed Stats