Ayende @ Rahien

Hi!
My name is Ayende Rahien
Founder of Hibernating Rhinos LTD and RavenDB.
You can reach me by phone or email:

ayende@ayende.com

@

Posts: 5,949 | Comments: 44,546

filter by tags archive

Rhino ETL Union Operation


Yes, it is somewhat of a blast from the past, but I just got asked how to create a good Union All operation for Rhino ETL.

The obvious implementation is:

   1: public class UnionAllOperation : AbstractOperation
   2: {
   3:     private readonly List<IOperation> _operations = new List<IOperation>(); 
   4:  
   5:     public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
   6:     {
   7:         foreach (var operation in _operations)
   8:             foreach (var row in operation.Execute(null))
   9:                 yield return row;
  10:     }
  11:  
  12:     public UnionAllOperation Add(IOperation operation)
  13:     {
  14:         _operations.Add(operation);
  15:         return this;
  16:     }
  17: }

The problem is that this does everything synchronously. The following code is a better impl, but note that this is notepad code, with all the implications of that.

   1: public class UnionAllOperation : AbstractOperation
   2: {
   3:     private readonly List<IOperation> _operations = new List<IOperation>(); 
   4:  
   5:     public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
   6:     {
   7:         var blockingCollection = new BlockingCollection<Row>();
   8:         var tasks = _operations.Select(currentOp => Task.Factory.StartNew(() =>{
   9:                 foreach(var operation in currentOp.Execute(null))
  10:                 {
  11:                     blockingCollection.Add(operation);
  12:                 }
  13:                 blockingCollection.Add(null); // free the consumer thread
  14:             });
  15:  
  16:         Row r;
  17:         while(true){
  18:             if(tasks.All(x=>x.IsFaulted || x.IsCanceled || x.IsCompleted)) // all done
  19:                 break;
  20:             r = blockingCollection.Take();
  21:             if(r == null)
  22:                 continue;
  23:             yield return r;
  24:         }
  25:         while(blockingCollection.TryTake(out r)) {
  26:             if(r == null)
  27:                 continue;
  28:             yield return r;
  29:         }
  30:         Task.WaitAll(tasks.ToArray()); // raise any exception that were raised during execption
  31:     }
  32:  
  33:     public UnionAllOperation Add(IOperation operation)
  34:     {
  35:         _operations.Add(operation);
  36:         return this;
  37:     }
  38: }

Usual caveats apply, notepad code, never actually run it, much less tested / debugged it.

Feel free to rip into it, though.

Dale Newman did some improvements, the most important one is to make sure that we aren’t going to evaluate the tasks several times (opps! I told ya it was notepad code Smile), and now it looks like this:

   1: /// <summary>
   2: /// Combines rows from all operations.
   3: /// </summary>
   4: public class UnionAllOperation : AbstractOperation {
   5:  
   6:     private readonly List<IOperation> _operations = new List<IOperation>();
   7:  
   8:     /// <summary>
   9:     /// Executes the added operations in parallel.
  10:     /// </summary>
  11:     /// <param name="rows"></param>
  12:     /// <returns></returns>
  13:     public override IEnumerable<Row> Execute(IEnumerable<Row> rows) {
  14:  
  15:         var blockingCollection = new BlockingCollection<Row>();
  16:  
  17:         Debug("Creating tasks for {0} operations.", _operations.Count);
  18:  
  19:         var tasks = _operations.Select(currentOp => Task.Factory.StartNew(() => {
  20:             Trace("Executing {0} operation.", currentOp.Name);
  21:             foreach (var row in currentOp.Execute(null)) {
  22:                 blockingCollection.Add(row);
  23:             }
  24:             blockingCollection.Add(null); // free the consumer thread
  25:         })).ToArray();
  26:  
  27:         Row r;
  28:         while (true) {
  29:             if (tasks.All(x => x.IsFaulted || x.IsCanceled || x.IsCompleted)) {
  30:                 Debug("All tasks have been canceled, have faulted, or have completed.");
  31:                 break;
  32:             }
  33:  
  34:             r = blockingCollection.Take();
  35:             if (r == null)
  36:                 continue;
  37:  
  38:             yield return r;
  39:  
  40:         }
  41:  
  42:         while (blockingCollection.TryTake(out r)) {
  43:             if (r == null)
  44:                 continue;
  45:             yield return r;
  46:         }
  47:  
  48:         Task.WaitAll(tasks); // raise any exception that were raised during execption
  49:  
  50:     }
  51:  
  52:     /// <summary>
  53:     /// Initializes this instance
  54:     /// </summary>
  55:     /// <param name="pipelineExecuter">The current pipeline executer.</param>
  56:     public override void PrepareForExecution(IPipelineExecuter pipelineExecuter) {
  57:         foreach (var operation in _operations) {
  58:             operation.PrepareForExecution(pipelineExecuter);
  59:         }
  60:     }
  61:  
  62:     /// <summary>
  63:     /// Add operation parameters
  64:     /// </summary>
  65:     /// <param name="ops">operations delimited by commas</param>
  66:     /// <returns></returns>
  67:     public UnionAllOperation Add(params IOperation[] ops) {
  68:         foreach (var operation in ops) {
  69:             _operations.Add(operation);
  70:         }
  71:         return this;
  72:     }
  73:  
  74:     /// <summary>
  75:     /// Add operations
  76:     /// </summary>
  77:     /// <param name="ops">an enumerable of operations</param>
  78:     /// <returns></returns>
  79:     public UnionAllOperation Add(IEnumerable<IOperation> ops) {
  80:         _operations.AddRange(ops);
  81:         return this;
  82:     }
  83:  
  84: }

Comments

Rémi BOURGAREL

Isn't it the purpose of parrallel linq ?

Vladimir

Kinda stupid question, why not use GetConsumingEnumerable() + CompleteAdding, like this (https://gist.github.com/v2m/4969559).

Whut

Clever and simple code.

But I don't understand what is the purpose of: "blockingCollection.Add(null); // free the consumer thread"? Could someone explain?

Also, isn't there a race condition in the first while loop? Imagine last task that added all rows but not yet finished. In the mean time main thread consumes all rows. At this point check tasks.All(x => x.IsFaulted || x.IsCanceled || x.IsCompleted) will return false, but since there will be no new rows added, call to blockingCollection.Take() will block forever.

Ayende Rahien

Whut, The whole point of the null is to avoid the race condition. Either all the tasks are done, in which case you'll have a null (freeing you from the Take()), or not.

Whut

I believe that this null helps when there are no rows returned by currentOp, but still race condition is there. Add something like if (currentOp.Name == "MarkerOperation") { Thread.Sleep(TimeSpan.FromSeconds(5)); } after blockingCollection.Add(null) to have it visible.

Dale Newman

I want to thank Ayende and the folks that commented on this blog post. I ended up using Vladimir's version in my SQLoogle project, and it works great!

If you're interested in Rhino ETL at all, I just finished an article on CodeProject about SQLoogle, which is mainly about using Ayende's Rhino ETL. It's here:

http://www.codeproject.com/Articles/573937/SQLoogle-Part-1-of-2

Thanks again :-)

Comment preview

Comments have been closed on this topic.

FUTURE POSTS

  1. The RavenDB Comic Strip: Part III – High availability & sleeping soundly - about one hour from now

There are posts all the way to May 28, 2015

RECENT SERIES

  1. The RavenDB Comic Strip (3):
    20 May 2015 - Part II – a team in trouble!
  2. Special Offer (2):
    27 May 2015 - 29% discount for all our products
  3. RavenDB Sharding (3):
    22 May 2015 - Adding a new shard to an existing cluster, splitting the shard
  4. Challenge (45):
    28 Apr 2015 - What is the meaning of this change?
  5. Interview question (2):
    30 Mar 2015 - fix the index
View all series

RECENT COMMENTS

Syndication

Main feed Feed Stats
Comments feed   Comments Feed Stats