Rhino ETL Union Operation
Yes, it is somewhat of a blast from the past, but I just got asked how to create a good Union All operation for Rhino ETL.
The obvious implementation is:
1: public class UnionAllOperation : AbstractOperation2: {
3: private readonly List<IOperation> _operations = new List<IOperation>();4:
5: public override IEnumerable<Row> Execute(IEnumerable<Row> rows)6: {
7: foreach (var operation in _operations)8: foreach (var row in operation.Execute(null))9: yield return row;10: }
12: public UnionAllOperation Add(IOperation operation)13: {
14: _operations.Add(operation);
15: return this;16: }
17: }
The problem is that this does everything synchronously. The following code is a better impl, but note that this is notepad code, with all the implications of that.
1: public class UnionAllOperation : AbstractOperation2: {
3: private readonly List<IOperation> _operations = new List<IOperation>();4:
5: public override IEnumerable<Row> Execute(IEnumerable<Row> rows)6: {
7: var blockingCollection = new BlockingCollection<Row>();8: var tasks = _operations.Select(currentOp => Task.Factory.StartNew(() =>{
9: foreach(var operation in currentOp.Execute(null))10: {
11: blockingCollection.Add(operation);
12: }
13: blockingCollection.Add(null); // free the consumer thread14: });
16: Row r;
17: while(true){18: if(tasks.All(x=>x.IsFaulted || x.IsCanceled || x.IsCompleted)) // all done19: break;20: r = blockingCollection.Take();
21: if(r == null)22: continue;23: yield return r;24: }
25: while(blockingCollection.TryTake(out r)) {26: if(r == null)27: continue;28: yield return r;29: }
30: Task.WaitAll(tasks.ToArray()); // raise any exception that were raised during execption31: }
33: public UnionAllOperation Add(IOperation operation)34: {
35: _operations.Add(operation);
36: return this;37: }
38: }
Usual caveats apply, notepad code, never actually run it, much less tested / debugged it.
Feel free to rip into it, though.
Dale Newman did some improvements, the most important one is to make sure that we aren’t going to evaluate the tasks several times (opps! I told ya it was notepad code ), and now it looks like this:
1: /// <summary>
2: /// Combines rows from all operations.
3: /// </summary>
4: public class UnionAllOperation : AbstractOperation {
6: private readonly List<IOperation> _operations = new List<IOperation>();
8: /// <summary>
9: /// Executes the added operations in parallel.
10: /// </summary>
11: /// <param name="rows"></param>
12: /// <returns></returns>
13: public override IEnumerable<Row> Execute(IEnumerable<Row> rows) {
15: var blockingCollection = new BlockingCollection<Row>();
17: Debug("Creating tasks for {0} operations.", _operations.Count);
19: var tasks = _operations.Select(currentOp => Task.Factory.StartNew(() => {
20: Trace("Executing {0} operation.", currentOp.Name);
21: foreach (var row in currentOp.Execute(null)) {
22: blockingCollection.Add(row);
23: }
24: blockingCollection.Add(null); // free the consumer thread
25: })).ToArray();
27: Row r;
28: while (true) {
29: if (tasks.All(x => x.IsFaulted || x.IsCanceled || x.IsCompleted)) {
30: Debug("All tasks have been canceled, have faulted, or have completed.");
31: break;
32: }
34: r = blockingCollection.Take();
35: if (r == null)
36: continue;
38: yield return r;
40: }
42: while (blockingCollection.TryTake(out r)) {
43: if (r == null)
44: continue;
45: yield return r;
46: }
48: Task.WaitAll(tasks); // raise any exception that were raised during execption
50: }
52: /// <summary>
53: /// Initializes this instance
54: /// </summary>
55: /// <param name="pipelineExecuter">The current pipeline executer.</param>
56: public override void PrepareForExecution(IPipelineExecuter pipelineExecuter) {
57: foreach (var operation in _operations) {
58: operation.PrepareForExecution(pipelineExecuter);
59: }
60: }
62: /// <summary>
63: /// Add operation parameters
64: /// </summary>
65: /// <param name="ops">operations delimited by commas</param>
66: /// <returns></returns>
67: public UnionAllOperation Add(params IOperation[] ops) {
68: foreach (var operation in ops) {
69: _operations.Add(operation);
70: }
71: return this;
72: }
74: /// <summary>
75: /// Add operations
76: /// </summary>
77: /// <param name="ops">an enumerable of operations</param>
78: /// <returns></returns>
79: public UnionAllOperation Add(IEnumerable<IOperation> ops) {
80: _operations.AddRange(ops);
81: return this;
82: }
84: }
Isn't it the purpose of parrallel linq ?
Kinda stupid question, why not use GetConsumingEnumerable() + CompleteAdding, like this (https://gist.github.com/v2m/4969559).
Clever and simple code.
But I don't understand what is the purpose of: "blockingCollection.Add(null); // free the consumer thread"? Could someone explain?
Also, isn't there a race condition in the first while loop? Imagine last task that added all rows but not yet finished. In the mean time main thread consumes all rows. At this point check tasks.All(x => x.IsFaulted || x.IsCanceled || x.IsCompleted) will return false, but since there will be no new rows added, call to blockingCollection.Take() will block forever.
Whut, The whole point of the null is to avoid the race condition. Either all the tasks are done, in which case you'll have a null (freeing you from the Take()), or not.
I believe that this null helps when there are no rows returned by currentOp, but still race condition is there. Add something like if (currentOp.Name == "MarkerOperation") { Thread.Sleep(TimeSpan.FromSeconds(5)); } after blockingCollection.Add(null) to have it visible.
I want to thank Ayende and the folks that commented on this blog post. I ended up using Vladimir's version in my SQLoogle project, and it works great!
If you're interested in Rhino ETL at all, I just finished an article on CodeProject about SQLoogle, which is mainly about using Ayende's Rhino ETL. It's here:
Thanks again :-)
Comment preview