Ayende @ Rahien

It's a girl

Rhino ETL Union Operation

Yes, it is somewhat of a blast from the past, but I just got asked how to create a good Union All operation for Rhino ETL.

The obvious implementation is:

   1: public class UnionAllOperation : AbstractOperation
   2: {
   3:     private readonly List<IOperation> _operations = new List<IOperation>(); 
   4:  
   5:     public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
   6:     {
   7:         foreach (var operation in _operations)
   8:             foreach (var row in operation.Execute(null))
   9:                 yield return row;
  10:     }
  11:  
  12:     public UnionAllOperation Add(IOperation operation)
  13:     {
  14:         _operations.Add(operation);
  15:         return this;
  16:     }
  17: }

The problem is that this does everything synchronously. The following code is a better impl, but note that this is notepad code, with all the implications of that.

   1: public class UnionAllOperation : AbstractOperation
   2: {
   3:     private readonly List<IOperation> _operations = new List<IOperation>(); 
   4:  
   5:     public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
   6:     {
   7:         var blockingCollection = new BlockingCollection<Row>();
   8:         var tasks = _operations.Select(currentOp => Task.Factory.StartNew(() =>{
   9:                 foreach(var operation in currentOp.Execute(null))
  10:                 {
  11:                     blockingCollection.Add(operation);
  12:                 }
  13:                 blockingCollection.Add(null); // free the consumer thread
  14:             });
  15:  
  16:         Row r;
  17:         while(true){
  18:             if(tasks.All(x=>x.IsFaulted || x.IsCanceled || x.IsCompleted)) // all done
  19:                 break;
  20:             r = blockingCollection.Take();
  21:             if(r == null)
  22:                 continue;
  23:             yield return r;
  24:         }
  25:         while(blockingCollection.TryTake(out r)) {
  26:             if(r == null)
  27:                 continue;
  28:             yield return r;
  29:         }
  30:         Task.WaitAll(tasks.ToArray()); // raise any exception that were raised during execption
  31:     }
  32:  
  33:     public UnionAllOperation Add(IOperation operation)
  34:     {
  35:         _operations.Add(operation);
  36:         return this;
  37:     }
  38: }

Usual caveats apply, notepad code, never actually run it, much less tested / debugged it.

Feel free to rip into it, though.

Dale Newman did some improvements, the most important one is to make sure that we aren’t going to evaluate the tasks several times (opps! I told ya it was notepad code Smile), and now it looks like this:

   1: /// <summary>
   2: /// Combines rows from all operations.
   3: /// </summary>
   4: public class UnionAllOperation : AbstractOperation {
   5:  
   6:     private readonly List<IOperation> _operations = new List<IOperation>();
   7:  
   8:     /// <summary>
   9:     /// Executes the added operations in parallel.
  10:     /// </summary>
  11:     /// <param name="rows"></param>
  12:     /// <returns></returns>
  13:     public override IEnumerable<Row> Execute(IEnumerable<Row> rows) {
  14:  
  15:         var blockingCollection = new BlockingCollection<Row>();
  16:  
  17:         Debug("Creating tasks for {0} operations.", _operations.Count);
  18:  
  19:         var tasks = _operations.Select(currentOp => Task.Factory.StartNew(() => {
  20:             Trace("Executing {0} operation.", currentOp.Name);
  21:             foreach (var row in currentOp.Execute(null)) {
  22:                 blockingCollection.Add(row);
  23:             }
  24:             blockingCollection.Add(null); // free the consumer thread
  25:         })).ToArray();
  26:  
  27:         Row r;
  28:         while (true) {
  29:             if (tasks.All(x => x.IsFaulted || x.IsCanceled || x.IsCompleted)) {
  30:                 Debug("All tasks have been canceled, have faulted, or have completed.");
  31:                 break;
  32:             }
  33:  
  34:             r = blockingCollection.Take();
  35:             if (r == null)
  36:                 continue;
  37:  
  38:             yield return r;
  39:  
  40:         }
  41:  
  42:         while (blockingCollection.TryTake(out r)) {
  43:             if (r == null)
  44:                 continue;
  45:             yield return r;
  46:         }
  47:  
  48:         Task.WaitAll(tasks); // raise any exception that were raised during execption
  49:  
  50:     }
  51:  
  52:     /// <summary>
  53:     /// Initializes this instance
  54:     /// </summary>
  55:     /// <param name="pipelineExecuter">The current pipeline executer.</param>
  56:     public override void PrepareForExecution(IPipelineExecuter pipelineExecuter) {
  57:         foreach (var operation in _operations) {
  58:             operation.PrepareForExecution(pipelineExecuter);
  59:         }
  60:     }
  61:  
  62:     /// <summary>
  63:     /// Add operation parameters
  64:     /// </summary>
  65:     /// <param name="ops">operations delimited by commas</param>
  66:     /// <returns></returns>
  67:     public UnionAllOperation Add(params IOperation[] ops) {
  68:         foreach (var operation in ops) {
  69:             _operations.Add(operation);
  70:         }
  71:         return this;
  72:     }
  73:  
  74:     /// <summary>
  75:     /// Add operations
  76:     /// </summary>
  77:     /// <param name="ops">an enumerable of operations</param>
  78:     /// <returns></returns>
  79:     public UnionAllOperation Add(IEnumerable<IOperation> ops) {
  80:         _operations.AddRange(ops);
  81:         return this;
  82:     }
  83:  
  84: }
Tags:

Published at

Originally posted at

Comments (6)

Legacy ETL solutions

I am currently working with a customer on some issues that they have with moving to RavenDB, and we run into a set of problems with their (Legacy with a capital L) relational database.

They run into several problems with how to create a good ETL on that, especially with regards to how to detect changes and the tendency of the legacy system to re-use old primary keys.

The solution for both is actually fairly easy. Instead of relying on the primary keys of the legacy system, which can be re-used and creates a ton of trouble down the stream, create your own ids, distinct from the legacy system ids.

That can be done easily enough by issuing:

ALTER TABLE Customers ADD UniqueKeyForEtl uniqueidentifier NOT NULL DEFAULT(newid())

This is a non breaking change operation, that is, you can do that on any database without fearing that this would somehow break any application that is using it. The good thing about this is that this now ensures that every row in the table is going to have a unique, never repeating, never re-used key. This is a good approach because it is also something that has such a low cost.

The next problem was how to actually detect changes, the Legacy System does have LastModified column on some tables, and actually bothers to update this in some cases, but not in all of them. Again, the answer is to add a column to the table. The easiest option would probably to just ensure that the LastModified is updated in a trigger, something like:

CREATE TRIGGER UpdateCustomersLastModifiedDate ON Customers
FOR UPDATE 
AS
UPDATE [TableName] SET Customers.LastModified=getdate()
FROM Customers INNER JOIN Inserted ON Customers.[UniqueID]= Inserted.[UniqueID]

Maybe with a check to skip the update if the Legacy System already updated it.

The problem is that the Legacy System has so many triggers already, that the client is very reluctant to add another one. So another option is to use the rowversion feature in SQL Server. This allows us to define the following:

ALTER TABLE Customers ADD ModifiedVersionForEtl rowversion NOT NULL 

The rowversion will be incremented by the DB on every write. So you can check on all rows that has been updated since the last version that you have seen. This isn’t a trigger, since this happens as part of the actual update process, and is likely to be significantly cheaper.

By adding these two columns, an operation that it is safe to make since it can’t break any code that uses the database, we have given ourselves an easy way to detect changes, and an easy way to get unique keys that are actually unique, and non repeating.

Tags:

Published at

Originally posted at

Comments (9)

Public Service Announcement: Git master repositories for the Rhino Tools projects

There have been some changes, and it seems that it is hard to track them. Here are where you can find the master repositories for the rhino tools projects:

Rhino ETL Video

Paul Barriere has a video up of a presentation about Rhino ETL:

ETL stands for Extract, Transform, Load. For example, you receive files or other data from vendors or other third parties which you need to manipulate in some way and then insert into your own database. Rhino ETL is an open source C# package that I have used for dozens of production processes quite successfully. By using C# for your ETL tasks you can create testable, reusable components more easily than with tools like SSIS and DTS.

It is good to see more information available on Rhino ETL.

On PSake

James Kovacks introduced psake ( a power shell based build system )over a year ago, and at the time, I gave it a glance and decided that it was interesting, but not worth further investigation.

This weekend, as I was restructuring my Rhino Tools project, I realized that I need to touch the build system as well. The Rhino Tools build system has been through several projects, and was originally ported from Hibernate. It is NAnt based, complex, and can do just about everything that you want expect be easily understandable.

It became clear to me very quickly that it ain’t going to be easy to change the way it works, nor would it be easy to modify that to reflect the new structure. There are other issues with complex build systems, they tend to create zones of “there be dragons”, where only the initiated go, and even they go with trepidation. I decided to take advantage of the changes that I am already making to get a simpler build system.

I had a couple of options open to me: Rake and Bake.

Bake seemed natural, until I remember that no one touched it in a year or two. Beside, I can only stretch NIH so far :-). And while I know that people rave about rake, I did not want to introduce a Ruby dependency on my build system. I know that it was an annoyance when I had to build Fluent NHibernate.

One thing that I knew that I am not willing to go back to was editing XML, so I started looking at other build systems, ending up running into PSake.

There are a few interesting things that reading about it brought to mind. First, NAnt doesn’t cut it anymore. It can’t build WPF applications nor handle multi targeting well. Second, I am already managing the compilation part of the build using MSBuild, thanks to Visual Studio.

That leave the build system with executing msbuild, setting up directories, executing tests, running post build tools, etc.

PSake handles those well, since the execution environment is the command line. The syntax is nice, just enough to specify tasks and dependencies, but everything else is just pure command line. The following is Rhino Mocks build script, using PSake:

properties { 
  $base_dir  = resolve-path .
  $lib_dir = "$base_dir\SharedLibs"
  $build_dir = "$base_dir\build" 
  $buildartifacts_dir = "$build_dir\" 
  $sln_file = "$base_dir\Rhino.Mocks-vs2008.sln" 
  $version = "3.6.0.0"
  $tools_dir = "$base_dir\Tools"
  $release_dir = "$base_dir\Release"
} 

task default -depends Release

task Clean { 
  remove-item -force -recurse $buildartifacts_dir -ErrorAction SilentlyContinue 
  remove-item -force -recurse $release_dir -ErrorAction SilentlyContinue 
} 

task Init -depends Clean { 
    . .\psake_ext.ps1
    Generate-Assembly-Info `
        -file "$base_dir\Rhino.Mocks\Properties\AssemblyInfo.cs" `
        -title "Rhino Mocks $version" `
        -description "Mocking Framework for .NET" `
        -company "Hibernating Rhinos" `
        -product "Rhino Mocks $version" `
        -version $version `
        -copyright "Hibernating Rhinos & Ayende Rahien 2004 - 2009"
        
    Generate-Assembly-Info `
        -file "$base_dir\Rhino.Mocks.Tests\Properties\AssemblyInfo.cs" `
        -title "Rhino Mocks Tests $version" `
        -description "Mocking Framework for .NET" `
        -company "Hibernating Rhinos" `
        -product "Rhino Mocks Tests $version" `
        -version $version `
        -clsCompliant "false" `
        -copyright "Hibernating Rhinos & Ayende Rahien 2004 - 2009"
        
    Generate-Assembly-Info `
        -file "$base_dir\Rhino.Mocks.Tests.Model\Properties\AssemblyInfo.cs" `
        -title "Rhino Mocks Tests Model $version" `
        -description "Mocking Framework for .NET" `
        -company "Hibernating Rhinos" `
        -product "Rhino Mocks Tests Model $version" `
        -version $version `
        -clsCompliant "false" `
        -copyright "Hibernating Rhinos & Ayende Rahien 2004 - 2009"
        
    new-item $release_dir -itemType directory 
    new-item $buildartifacts_dir -itemType directory 
    cp $tools_dir\MbUnit\*.* $build_dir
} 

task Compile -depends Init { 
  exec msbuild "/p:OutDir=""$buildartifacts_dir "" $sln_file"
} 

task Test -depends Compile {
  $old = pwd
  cd $build_dir
  exec ".\MbUnit.Cons.exe" "$build_dir\Rhino.Mocks.Tests.dll"
  cd $old        
}

task Merge {
    $old = pwd
    cd $build_dir
    
    Remove-Item Rhino.Mocks.Partial.dll -ErrorAction SilentlyContinue 
    Rename-Item $build_dir\Rhino.Mocks.dll Rhino.Mocks.Partial.dll
    
    & $tools_dir\ILMerge.exe Rhino.Mocks.Partial.dll `
        Castle.DynamicProxy2.dll `
        Castle.Core.dll `
        /out:Rhino.Mocks.dll `
        /t:library `
        "/keyfile:$base_dir\ayende-open-source.snk" `
        "/internalize:$base_dir\ilmerge.exclude"
    if ($lastExitCode -ne 0) {
        throw "Error: Failed to merge assemblies!"
    }
    cd $old
}

task Release -depends Test, Merge {
    & $tools_dir\zip.exe -9 -A -j `
        $release_dir\Rhino.Mocks.zip `
        $build_dir\Rhino.Mocks.dll `
        $build_dir\Rhino.Mocks.xml `
        license.txt `
        acknowledgements.txt
    if ($lastExitCode -ne 0) {
        throw "Error: Failed to execute ZIP command"
    }
}

It is about 50 lines, all told, with a lot of spaces and is quite readable.

This handles the same tasks as the old set of scripts did, and it does this without undue complexity. I like it.

The complexity of unity

This post is about the Rhino Tools project. It has been running for a long time now, over 5 years, and amassed quite a few projects in it.

I really like the codebase in the projects in Rhino Tools, but secondary aspects has been creeping in that made managing the project harder. In particular, putting all the projects in a single repository made it easy, far too easy. Projects had an easy time taking dependencies that they shouldn’t, and the entire build process was… complex, to say the least.

I have been somewhat unhappily tolerant of this so far because while it was annoying, it didn’t actively create problems for me so far. The problems started creeping when I wanted to move Rhino Tools to use NHibernate 2.1. That is when I realized that this is going to be a very painful process, since I have to take on the entire Rhino Tools set of projects in one go, instead of dealing with each of them independently. the fact that so many of the dependencies where in Rhino Commons, to which I have a profound dislike, helped increase my frustration.

There are other things that I find annoying now, Rhino Security is a general purpose library for NHibernate, but it makes a lot of assumptions about how it is going to use, which is wrong. Rhino ETL had a dependency on Rhino Commons because of three classes.

To resolve that, I decided to make a few other changes, taking dependencies is supposed to be a hard process, it is supposed to make you think.

I have been working on splitting the Rhino Tools projects to all its sub projects, so each of them is independent of all the others. That increase the effort of managing all of them as a unit, but decrease the effort of managing them independently.

The current goals are to:

  • Make it simpler to treat each project independently
  • Make it easier to deal with the management of each project (dependencies, build scripts)

There is a side line in which I am also learning to use Git, and there is a high likelihood that the separate Rhino Tools projects will move to github. Suversion’s patching & tracking capabilities annoyed me for the very last time about a week ago.

Rhino ETL 2.0

Rhino ETL was born out of a need. I need to do a lot of ETL type operations. Those include anything from moving data from legacy databases to my database, importing files, importing data over web services, etc. For a while, I have used SSIS for those needs. It has proven... inadequate. Mostly in terms of ease of development, deployment, error handling, etc.

This is my third attempt at building an ETL tool. The third time is much shorter and clearer than both previous attempts.

The goals for the project were:

  • Developer friendly:
    • Errors
    • Development
    • Deployment
  • Performant - I need to move large amounts of data around, and I need to do it fast.
  • Easy - The hard part should be handling the required transforms, dealing with the logic for in/out, etc.
  • Unlimited - You should not be limited to what the tool gives you, and you should be able to integrate easily with other tools
  • Language agnostic - You should be able to develop solutions for this using C#/VB/Boo
  • DSL - Provide a DSL to make it even easier to work with

The overall concept is based around these two classes, and the idea of a pipeline:

image image

Here is a simple operation, which just generate all the even numbers to a million:

public class EvenNumberToMillion : AbstractOperation
{
	public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
	{
		for(int i = 2; i< 1000000; i += 2)
		{
			Row row = new Row();
			row["number"] = i;
			yield return row;
		}
	}
}

This is an input operation, it ignores its rows parameter, and yields rows generated by some other way. As you can see, we use the yield rows for each iteration.

We combine operations into a pipeline using the process. If we wanted to print the numbers, we would have build the following pipeline process:

public class PrintNumbersProcess : EtlProcess
{
	public override void Initialize()
	{
		Register(new EvenNumberToMillion());
		Register(new PrintNumbers());
	}
}

All the output of the first operation goes into the second operation, and so on and so forth. Using the DSL it will look like:

operation EvenNumbersToMillion:
	for i in range(1000000,2):
		yield Row(Number: i)

operation PrintNumbers:
	for row in rows:
		print row.Number

process PrintNumbersProcess:
	EvenNumbersToMillion()
	PrintNumbers()

This is just to demonstrate the concept of the pipeline. Now we can get into the interesting operations. As you already surmised, AbstractOperation is the common base class, and you can inherit it to produce rows from any source.

Inputs

Rhino ETL offers special support for getting data from a database. It means that you can define it simple as:

public class ReadUsers : ConventionInputCommandOperation
{
    public ReadUsers() : base("test")
    {
        Command = "SELECT Id, Name,Email FROM Users";
    }
}

Or, using DSL:

input "test", Command = "SELECT id, name, email  FROM Users"

Note the "test" here, it is the name of the connection string in the app.config.

Outputs

On the output side, we have more interesting options. We can use any custom option that we want, of course, but for working with databases, we have the following options:

Standard DB commands:

public class FibonacciOutput : ConventionOutputCommandOperation
{
    public FibonacciOutput() : base("test")
    {
        Command = "INSERT INTO Fibonacci (Id) VALUES(@Id)";
    }
}

You'll note that I am not specifying the parameters, those are taken implicitly from the current row.

Using DSL:

output "test", Command = "INSERT INTO Fibonacci (Id) VALUES(@Id)"

SQL Batch operations:

public class FibonacciOutput : ConventionSqlBatchOpeartion
{
    public FibonacciOutput() : base("test")
    {
        Command = "INSERT INTO Fibonacci (Id) VALUES(@Id)";
    }
}

Using DSL: Haven't written that yet :-(

Sql Bulk Insert:

 public class FibonacciBulkInsert : SqlBulkInsertOperation
    {
        public FibonacciBulkInsert() : base("test", "Fibonacci")
        {
        }

        protected override void PrepareSchema()
        {
            Schema["id"] = typeof (int);
        }
    }

Using DSL: Haven't written that yet :-(

Files

For working with files, we have the support of the excellent FileHelpers library, which makes working with files really easy.

Reading from a file is simply:

public class UserRecord
{
    public string email;
    public int id;
    public string name;
}

public class ReadUsersFromFile : AbstractOperation
{
    public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
    {
        using(FileEngine file = FluentFile.For<UserRecord>().From("users.txt"))
        {
            foreach (object obj in file)
            {
                yield return Row.FromObject(obj);
            }
        }
    }
}

There is 1:1 translation to Boo here, so I'll spare you that.

Writing is very similar:

public class WriteUsersToFile : AbstractOperation
{
    public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
    {
        FluentFile engine = FluentFile.For<UserRecord>();
        engine.HeaderText = "Id\tName\tEmail";
        using(FileEngine file = engine.To("users.txt"))
        {
            foreach (Row row in rows)
            {
                UserRecord record = new UserRecord();
                
                record.Id = (int)row["id"];
                record.Name = (string)row["name"];
                record.Email = (string)row["email"];

                file.Write(record);
            }
        }
        yield break;
    }
}

Joins

Joins are an interesting concept, and I play with them quite extensively recently. Joins in Rhino ETL are implemented as sub processes. Hash joins are very simple:

public class JoinUsersAndIds : JoinOperation
{
	protected override Row MergeRows(Row leftRow, Row rightRow)
	{
		Row row = leftRow.Clone();
		row["user_id"] = rightRow["new_id"];
		return row;
	}

	protected override void SetupJoinConditions()
	{
		InnerJoin
			.Left("id")
			.Right("old_id");
	}
}

This is just the operation, you hook it up in the process using:

Register(new JoinUsersAndIds()
         	.Left(new GenerateUsers(25000))
         	.Right(new GenerateRandomIds(15000)));

Each side is capable of accepting a full blown sub process on its own.

Nested loops joins are appropriate for the more complex cases:

public class FullJoinUsersToPeopleByEmail : NestedLoopsJoinOperation
{
    protected override bool MatchJoinCondition(Row leftRow, Row rightRow)
    {
        return LeftJoin(leftRow["email"], rightRow["email"]);
    }

	protected override Row MergeRows(Row leftRow, Row rightRow)
    {
        Row row = new Row();
        row.Copy(leftRow);
        row["person_id"] = rightRow["id"];
        return row;
    }
}

Using DSL it looks like this:

join get_user_roles:
	left:
		input "test", Command = "SELECT id, name, email  FROM Users"
		
	right:
		nput "test", Command = "SELECT userid, roleid FROM UsersToRoles"
			
	on left.id ==  right.userid:
		row.Name = left.Name
		row.Role = right.RoleId

(A note about this syntax, this currently generate a nested loop join, I intend to make it generate an optimize version soon).

Branching

That was quite a challenge to implement, I kept missing a key point, and that tripped me for a while. Here is how send a row to several sources:

BranchingOperation split = new BranchingOperation()
	.Add(Partial
		.Register(new MultiplyByThreeOperation())
		.Register(new Fibonacci.Bulk.FibonacciBulkInsert()))
	.Add(Partial
		.Register(new Fibonacci.Bulk.FibonacciBulkInsert()));
Register(split);

(Note, the implementation is not as optimized as it could be.)

Aggregates

Well, you got to have those, don't you? Here is a simple Row Count aggregate:

public class RowCount : AbstractAggregationOperation
{
    	protected override void Accumulate(Row row, Row aggregate)
      {
            if (aggregate["count"] == null)
                aggregate["count"] = 0;

            int count = (int)aggregate["count"];
            aggregate["count"] = count + 1;
      }
}

We are called once per row, and can accumulate all the values that we want. We can use grouping to create more interesting results:

public class CostPerProductAggregation : AbstractAggregationOperation
{
    protected override void Accumulate(Row row, Row aggregate)
    {
        aggregate["name"] = row["name"];
        if(aggregate["cost"]==null)
            aggregate["cost"] = 0;
        aggregate["cost"] = ((int) aggregate["cost"]) + ((int) row["price"]);
    }

    protected override string[] GetColumnsToGroupBy()
    {
        return new string[] {"name"};
    }
}

We can also override the FinishAggregation(Row) method to complete any calculations when all the rows are completed. Rhino ETL guarantees that we will get the same aggregate row for all the rows that match the same columns, so that is taken care of.

Using DSL for that is simply:

aggregate join_product_names:
	accumulate:
		aggregate.names = [] if aggregate.names is null
		aggregate.names.Add(row.name)
	
	terminate:
		aggregate.result = string.Join(", ", aggregate.names.ToArray(string))

That is about it for now, I think. You can get the source for Rhino ETL here:

https://rhino-tools.svn.sourceforge.net/svnroot/rhino-tools/trunk/rhino-etl

I plan to have a binary release one I am confident that all the notes in this post are no longer relevant.

Algorithms, joins and performance

I thought about moving from hashtables to Dictionary<T,K>, I got interesting results.

For simple new Dictionary<string,object>(), I expected a significant improvement, but I got this:

image

This is actually much worse than the result of hashtable + ignore case comparison.

When I used that, I got this horrendous result:

image

I tried various other tricks, but none of them change the fact that making 7.5 million calls are going to cost a lot of time. And I want to support more than just 2,500 x 1,500.

I changed the implementation to look like this:

rightRowsByJoinKey = {}
for rightRow in right:
	key = rightRow.CreateKey( rightJoinParams )
	rightRowsByJoinKey[ key ] = [] unless rightRowsByJoinKey[ key ]
	rightRowsByJoinKey[ key ].Add(rightRow)

for leftRow in left:
	key = leftRow.CreateKey( leftJoinParams )
	for matchingRight in rightRowsByJoinKey[ key ] :
		yield MergeRows( leftRow, rightRow )

Now I have N + M, instead on N*M.

From performance perspective, it means that doing nested loop join on 2,500 x 1,500 result in 3.5 millions comparisons, which is quite a bit, even for such a small set of rows. It took over 6 seconds to run on my machine.

A hash join, however,will perform  measly 5,000 operations to do the same amount of work. On my machine, 2,500 x 1,500 completes in 0.2 seconds, most of which are spend it just initialization of the framework.

I try to take that to a spin on with two orders of magnitude more rows, 250,000 x 150,000 has completed in 5.48 seconds. Which is very encouraging.

Hash join is not applicable if you want to join over anything but equality, which is why we need the nested loops join as well.

Performance, Joins and why you should always have a profiler

I did some heavy duty import process yesterday, and we run into severe performance issue with Rhino ETL joins. Five joins with about 250,000 records on the initial left and a few tens of thousands on the rights took about 2 hours to complete.

That was unacceptable, and I decided that I have to fix this issue. I had a fairly good idea about what the issue was. Rhino ETL supports nested loops joins only at the moment, which means that the join is performed as (pseudo code):

for leftRow in left:
	for rightRow in right:
		if MatchJoinCondition(leftRow, rightRow):
			yield MergeRows(leftRow, rightRow)

Obviously the N*M was what causing the problem right? I quickly built a trivial join test, which joined 2,500 rows on the left with 1,500 rows on the right. Trivial stuff, and should result in 1,500 rows returned.

It executed in 6 seconds. That was a shock.

Well 1,500 * 2,500  = 3,750,000, but it should be that bad.

Then I run the code under a profiler, and it completed in 29 seconds, but I also saw this:

image

It is not the nested loop that cost me all this money, it was the hash table lookups!

The most expensive call was GetHashCodeOfString, we have some globalization stuff here, because I told the hashtable to be case insensitive, I tried removing that and run it under the profiler again, now it dropped to 18 seconds, and we had this cost structure to deal with:

image

We still spend almost all of our time just doing hash table lookups, although we dropped by 10 seconds this time.

I don't think that I would have ever consider the cost of simply doing the hashtable lookups as the primary cost of the join operations.

Fluent Pipelines

I am having a discussion with Jon Skeet about the merits of using Linq for pipelines and delegates/lambda instead of classes.

I kept saying that I don't really see the point, so I went ahead and implemented this:

GenerateData(10000)//enumerator from 0 .. 10000
	.Where(i => i % 3 == 0)
	.Transform(i => (i * 2).ToString() )
	.Act(i => Console.WriteLine(i))
	.Execute();

This uses the same approach as my previous pipeline, but it does it in C# 3.0, so it can use things like extension methods, which make this nicer. The same in C# 2.0 is possible, but take some ridiculous amount of code to do.

This code is much simpler than the code I have shown here, no?

Why do I use the first approach then?

Scalability.

What we are seeing here is about as trivial as it can get. What happens when we have more complex semantics?

Let us take writing to the database as an example. We need to do quite a bit there, more than we would put in the lambda expression, certainly. We can extract to a method, but then we run into another problem, we can't do method inheritance. This means that I have no easy way of abstracting the common stuff outside. Well, I can use template method, but that works if and only if I have a single place I want to change behavior.

As an example of scaling, let us take this piece of code:

public class FibonacciBulkInsert : SqlBulkInsertOperation
{
	public FibonacciBulkInsert() : base("test", "Fibonacci")
	{
	}

	protected override void PrepareSchema()
	{
		Schema["id"] = typeof (int);
	}
}

Which uses this base class to handle the bulk of the work.

One thing that Jon mentioned that was interesting was the ability to take advantage of Linq specific improvements, such as PLinq. This is indeed a consideration, but upon some reflection on it, I realized that the two are not mutually exclusive. If I want to take advantage of any of that, all I need is to modify the pipeline to iterate using PLinq rather than foreach.

My Code Sucks

There is a point where a project goes beyond the pale, where the complexity goes so far out of line that it is simply ludicrous.

I had such a point today. I had enough with SSIS and decided that I want to replace it with something better. I wrote an ETL tool to handle that in a few hours.

Why is this relevant? Because I have already build an ETL tool. Rhino ETL.

It is quite telling when the author of a tool decide that he doesn't want to use it.

I was decidedly proud of Rhino ETL for a while, then the problems started to creep in. The problems were not in the code per se, the entire architecture of the code was overly complex. In order to handle this complexity, I had resorted to throwing code at the problem, and then more code, and more code yet again.

At the moment, the current code base has two "minor" problems, exception handling and threading. The bigger problem is that I don't want to have to wade into this stinking pile and try to figure out what is going on there. I tried to be clever, and it is clever, in a horrible sort of way.

I don't have the time or patience to decipher code at the best of time, and at this point, it has gotten simply too complex. The project right now is at ~9,000 lines of code, so it is not that it is big, it is simply complex.

From the architecture perspective, I have made one huge mistake, I exposed the threading model to the application code. You can say that this stand in the root of my problems. I actually re-wrote this once already, moving from a home grown threading solution to using Retlang for threading. I made the same mistake and exposed the threading model to the application itself. Can you say: Big mistake!

From the point of view of the project itself, I started by defining the DSL syntax, and then built the project around that. It turns out that this has the usual "let us build the whole layer at a time". It also meant that a lot of the code had deep assumptions about the way it is called, making it unusable for using in other ways. This is excusable if we are talking about the DSL mapping layer, but not for the core code base itself.

Anyway, I am ranting and I should stop.

I spend six to eight hours today rewriting it from scratch.  It doesn't do threading, and it doesn't have a DSL interface yet, but it does pretty much everything that the old project did, in a quarter of the lines of code, and in a way that is much safer and easier to handle than what we are using currently.

Perfoming joins without having all the data in memory

Probably the easier way to perform a join is by a nested loop, given dataset A and dataset B, joining all the rows to dataset C is simple:

for row_a in A:
	for row_b in B:
		if condition(row_a, row_b):
			add join(row_a, row_b), C

Supporting left/right/cross joins is simple matter from here, but this has the issue of having to have both datasets in memory. I run into it while processing big files, I don't want to have to hold them in memory, especially if I need several level of joins in order to correctly process them.

I thought about bypassing the entire issue by simply writing the data down to a sqlite DB, and doing the join there. While this is possible, I would rather avoid having to do this.

Any suggestions on where to look to solve this issue?

Rhino ETL: Importing Data into MS CRM

Okay, so this is the "coding in anger" part for Rhino ETL. I need to import files into MS CRM entities. The files are standard CSV files, with the usual corruption of values that such files have. The CRM is accessed through the web services, although I am keeping aside the option of direct DB access, if I can't get the Web Services to perform any faster.

The first problem that I had was that the MS CRM Web Services are not simple services. They accept entities that are defined in the WSDL for them, not simple values. That put me in a complexity spin for a while, until I remembered that I am not working in my own little language, I am working on .NET. A quick trip to Visual Studio and an Add Web Reference + Compile later, I had integrated accessing the MS CRM into Rhino ETL.

Here is how it was done:

import CrmProxy.Crm from CrmProxy

Basically it means that I now had a dll that contains the proxy definitions for the web service, and I imported it. So it is incredibly easy to use.

Then, it was the matter of reading the file. Rhino ETL has integrated with the FileHelpers library, and I couldn't really be happier about it. There are several reasons for that, but the main one is that I run into something that the library can't handle, and I fixed that in 10 minutes, without changing the library code. Speaking of software that I like, this is one of the main criteria that I use to evaluate a piece of software. What happens when I step off the ledge? With FileHelpers, I can extend it so easily, that I really don't care about that.

Anyway, here is a part of the class definition for our file: 

[DelimitedRecord(","), IgnoreFirst]
class Customer:
      [FieldConverter(ConverterKind.Date, "dd/MM/yyyy")] 
      UpdateDate as date
      Id as int
      Name as string
      ResponsibleEmployee as Nullable of int
      [FieldConverter(Rhino.ETL.FileHelpersExtensions.DateTimeConverterWithNullValue, "dd/MM/yyyy","00/00/0000")] 
      ReceptionDate as Nullable of date

As you can see, there isn't much to it except defining the fields, types, etc.

source CustomersFile:
     execute:
            file = Read(typeof(Customer)).From(Configuration.CustomerFile)
            file.OnError(ErrorMode.SaveAndContinue)
            for customer in file:
                  print "Source ${customer.Id}"
                  SendRow( Row.FromObject(customer) ) 
            if file.HasErrors:
                  file.OutputErrors(Configuration.CustomerErrorsFile)
                  AddError("Errors have been written to ${Configuration.CustomerErrorsFile}")

Here I read from the file, use the Row.FromObject() to translate an entity into a row, and then send it forward. One amazing thing here is that FileHelpers will generate an errors file for me on demand. And that one is clear and concise and actually useful. Comparing to the amount of effort that I know are required to pull reasonable errors from SSIS file input, that is a great pleasure.

Anyway, if you missed that, I am very happy about FileHelpers.

Another thing to point out is the Configuration.CustomerFile, etc. The Configuration object is dynamically populated from a config file that you can pass to Rhino ETL (command line arg), which is a simple xml file in the format:

<configuration>
	<CustomerErrorsFile>D:\customers_errors.txt</CustomerErrorsFile>
</configuration>

Why XML? Because this seems like a place where I would want to touch with stuff like xmlpoke, etc. So it is easier to work with. It is also a flat configuration scheme, that doesn't have any semantics other than the simple key/value pair.

So, now that I have the data, I can send it to the destination:

destination Crm:
      initialize:
            Parameters.Srv = CrmService(
                  Url: Configuration.Url,
                  Credentials: NetworkCredential(
                            Configuration.Username,
                           
Configuration.Password,
                           
Configuration.Domain),
                  CallerIdValue: CallerId(CallerGuid: Guid(Configuration.CallerId)),
                  UnsafeAuthenticatedConnectionSharing: true,
                  PreAuthenticate: true
                  )

      onRow:
            theAccount = account(
                  accountnumber: Row.Id.ToString(),
                  name: Row.Name,
                  telephone1: Row.Phone,
                  telephone2: Row.Cellular,
                  telephone3: Row.AdditionalPhone,
                  fax: Row.Fax,
                  accountreceptiondate: CrmDateTime(Value: Row.ReceptionDate.ToString("yyyy-MM-ddT00:00:00")),
                  address1_city: Row.City,
                  )
            result = Parameters.Srv.Create(theAccount)
            print "Created account ${Row.Id} -> ${result}"

      cleanUp:
            Parameters.Srv.Dispose()

As you can see, we have the initialize method, which creates the service, then we instansiate an account instance, fill it with the required parameters, and go to town. It is also notable the easy translation of types from CLR types to CRM types, such as in the case of accountreceptiondate.

All in all, the only difficulities that I had during this were to make heads or tails from the inforamtion in the file, which is where I want the difficulity to lie when I am dealing with ETL processes.

We want to build something... beautiful!

imageI have currently stopped working on the UI for Rhino ETL, and it is an interesting experience. In the past, I have usually started with building the functionality and then adding the UI for it. That failed. The problem wasn't that the functionality wasn't there, it was that the UI wasn't nice enough for use, or wasn't pretty enough to attract.

This time, I am doing it in reverse, I am building the UI first, at least the rough draft of it, and then I intend to go and hook everything up. This means that while the picture on the right looks very nice, it is mostly skeleton with nice UI, without much functionality.

Right now it is just an editor for Rhino ETL scripts, but I am currently hooking up the requirements for a project system that would allow me to build some really interesting functionality on top of the scripts.

You can see some of the ideas that I have in the live view pane, which should allow me to visualize everything in the project for ease of use.

Doing it the other way around is not likely to be possible, sadly, except in the most trivial cases, so I think that I will skip that.

Anyway, the point of this post is that by giving it a good looking UI, I make myself feel much better about the application. Working on an ugly application is distasteful, and you tend to just try to get away. Working on pretty ones make you want to improve the current state.

That is the whole No Broken Windows mentality again, by the way, as well as some self motivation practices. Beside, and that is a secret, working on the UI allows me to consider the back end in a very detailed way without getting too low level.

Almost by accident, there is this Daily WTF post about just this issue. Insightful is not something that I use lightly with regards to the Daily WTF posts, but this one is certainly is.

If you possibly can, make it pretty, you, your users, and the application will be thankful.

Rhino ETL: Writing to files

Just finished writing the tests for reading and writing files. You can check the script below. With the exception of making the connection syntax consistent with the rest of it, I am going to consider this feature complete, the next things to is to work on deployment (basically, a tool that allows to run the script :-) ).

[DelimitedRecord("\t")]
class Customers:
    public OrderID as int
    public CustomerID as string
    public EmployeeID as int

connection(
    "Database",
    ConnectionType: SqlConnection,
    ConnectionString: "Data Source=localhost;Initial Catalog=ETL_Test; Integrated Security=SSPI;"
    )
source OrdersFromDatabase, Connection="Database":
    Command: "SELECT OrderID, CustomerID, EmployeeID FROM Orders"

destination OrdersFile:
    initialize:
        Parameters.File = Write(Customers).To("output.txt")
    onRow:
        cust = Customers(
            OrderID: Row.OrderID,
            CustomerID: Row.CustomerID,
            EmployeeID: Row.EmployeeID
            )
        Parameters.File.Write(cust)
    cleanUp:
        Parameters.File.Dispose()

pipeline OutputOrders:
    OrdersFromDatabase >> OrdersFile
target default:
    Execute("OutputOrders")

Tags:

Published at

Rhino ETL & FileHelpers Integration

Well, this image really excites me. It excites me because I got this after integration FileHelpers into Rhino ETL. This image is the result of an ETL script that joined a file against a table, did some additional processing on it and push it to a table in a third database.

I must say that FileHelpers has made this ridiculously easy to do.  All I have left to do is figure out how to test such a thing effectively.

image

For reference, here is the complete script:

[DelimitedRecord("\t")]
class Customers:
      CustomerID as string
      CompanyName as string
      ContactName as string
      ContactTitle as string
      Address as string
      City as string
      Country as string

     

connection
      "DestinationDB",
      ConnectionType: SqlConnection,
      ConnectionString: "Data Source=localhost;Initial Catalog=ETL_Test; Integrated Security=SSPI;"
      )

connection
      "Northwind",
      ConnectionType: SqlConnection,
      ConnectionString: "Data Source=localhost;Initial Catalog=Northwind; Integrated Security=SSPI;"
      )    

     

source CustomersFile:
      execute:
            for customer in Read(typeof(Customers)).From("""Files\CustomersTab.txt"""):
                  SendRow( Row
                        CustomerId: customer.CustomerID
                        CompanyName: customer.CompanyName,
                        ContactName: customer.ContactName
                        ))

 

source OrdersFromDatabase, Connection="Northwind":
      Command: "SELECT * FROM Orders"

     

join CustomersAndOrders:
      if Left.CustomerID == Right.CustomerID:
            Row.OrderID = Right.OrderID
            Row.CompanyName = Left.CompanyName
            Row.ContactName = Left.ContactName

           

 destination Final, Connection="DestinationDB":
      Command: "INSERT INTO OrdersWareHousing VALUES(@OrderID, @CompanyName,@ContactName)"

           

pipeline OrdersWareHouse:
      CustomersFile >> CustomersAndOrders.Left
      OrdersFromDatabase >> CustomersAndOrders.Right
      CustomersAndOrders >> Final

     

 target default:
      Execute("OrdersWareHouse")

Rhino ETL: Web Services Source

Well, after some thinking, I figured out that I actually had only two types of sources, database and other. Since other is going to always be code, I decided to start with web services source, since that is arguably the easiest (nothing much to do there). It turned out to be more complicated than I assumed, mainly because the .Net 2.0 web service stack has no easy way to do duck typing of web services. It requires compiled web services. I got around that by doing runtime compilation, but still,that is hardly elegant.

Anyway, what I have now is this:

source WebServiceGenerator:
	execute:
		empSrv= WebService(WsdlUrl: "http://localhost:9090/GetEmployees.asmx?wsdl" )
		results = empSrv.GetEmployees("Northwind")
		for result in results:
			SendRow( Row(
				Id: result.Id,
				Name: result.Name
				))

As you can see, the only thing that I really need to do is to specify the WSDL url for the web services, and everything from there is fairly natural. The execute block is used to distinguish between database sources (which has command, parameter, etc) and the "other" sources, such as the one above.

Note: Due to the way Rhino ETL works, the order of the sent rows and the order of their processing may differ. This means that if the web service send you Emp #1, Emp #2, Emp #3, they may be processed in Emp #1, Emp #3, Emp #2. (Actually, the issue would tend to come up with larger amount of rows, since the problem is different processing of the batches.

Next step, supporting Web Service output, which may require some complexity if the web service expect complex types (and since I know that I need to handle those, I have to support that with dynamic compilation, that is going to make my life interesting :-)

After that, I intend to start integrating File Helpers as a source / destination for files. I will post separately on this, but so far I am impressed with both the ease of the API and the quality of the documentation.

Rhino ETL: Targets

Well, that is two items down my list already, I have added support for targets to Rhino ETL. A target is similar in concept to a target in NAnt, it specify what needs to be run when the package run. This allows to specify how we want to run the various actions that we have.

Here is a simple example:

target default: 
Execute("CopyOrders") Execute("MoveCustomers")

As you can see, it just lists the pipelines that we want to run. By default, the target execute all the registered pipelines (or other actions) in parallel. But what happens when you want to run them in a sequence?

target default:
	sequence:
		Execute("CopyOrders")
		Execute("MoveCustomers")

Another option is that you have a dependency between two pipelines, but you don't care about the rest, you can do this as well, like this:

target withDependencies:
	copyOrders = Execute("CopyOrders")
	Execute("MoveCustomers")
	Execute("AfterCopyOrders").After(copyOrders)

Next task, transactions...

Rhino ETL: Aggregates

SoWell, that turned out to be really simple. Check out a simple RowCount:

transform CountRows:
	Context.Items.RowCount = 0 unless Context.Items.RowCount
	Context.Items.RowCount+=1
	RemoveRow()
	OnComplete:
		SendRow( Row(RowCount: Context.Items.RowCount) )
 

And then we have a more complex one, summing two columns:

transform CalcSumOfSalaryAndId:
	unless Context.Items.IdSum:
		Context.Items.IdSum = 0 
		Context.Items.SalarySum = 0
		
	Context.Items.IdSum+=Row.Id
	Context.Items.SalarySum+=Row.Salary
	RemoveRow()
	
	OnComplete:
		SendRow( Row(
			IdSum: Context.Items.IdSum, 
			SalarySum: Context.Items.SalarySum
			) )

 

 So, basically we have an initialization section, processing, and when all the processing is done, you can send new rows downward in the pipeline.

Tags:

Published at

Rhino.ETL: Status Report - Joins, Distinct & Engine work

Thread safety is a bitch.

  • Fully working on SVN now, including all the test.
  • Lot of work done on the side of the engine, mostly minor fixes, thread safety, refactoring the way rows are passed between stages in the pipeline, etc.
  • "Local variables" for transforms and joins - Local per pipeline, so you can keep state between runs
  • Joins - Right now it is nested loops / inner join only, since that seems to be the most common scenario that I have. It does means that I need to queue all the data for the join before it can get passed onward in the pipeline. Here is how you define it:
    join JoinWithTypeCasting_AndTransformation:
    	if Left.Id.ToString() == Right.UserId:
    		Row.Id = Left.Id
    		Row.Email = Left.Email
    		Row.FirstName = Left.Name.Split(char(' '))[0]
    		Row.LastName = Left.Name.Split(char(' '))[1]
    		Row.Organization = Right["Organization Id"]

    It should be mentioned that this is actually not a proper method, I deconstruct the if statement into a condition and a transformation, this should make it easier to implement more efficient join algorithms in the future, since I can execute the condition without the transformation.

  • Support for distinct, which turned out to be fairly easy to handle, this can handle a full row distinct or based on several columns.
    transform Distinct:
    	Context.Items["Rows"] = {} if Context.Items["Rows"] is null
    	key = Row.CreateKey(Parameters.Columns)
    	if Context.Items["Rows"].ContainsKey(key):
    		RemoveRow()
    		return
    	Context.Items["Rows"].Add(key, Row)

 

What remains to be done?

Well, Rhino.ETL is very promising, but it needs several more engine features before I would say it is possible to go live with it:

  • Aggregators - right now there is no way to handle something like COUNT(*), should be fairly easy to build.
  • Parallel / Sequence / Dependencies between pipelines / actions - I need a way to specify that this set of pipeline / actions should happen in sequence or in parallel, and that some should start after others have completed. This has direct affect on how transactions would work.
  • Transactions - No idea how to support this, the problem is that this basically means that I need to move all the actions that are happening inside a pipeline into a single thread. It also opens some interesting issues regarding database connection life cycles.
  • Non database destination / source -  I am thinking that I need at a minimum at least File, WebService and Customer (code). I need to eval using File Helpers are the provider for all the file processing handling.
  • Error handling - abort the current processing on error
  • Packaging - Command line tool to run a set of scripts
  • More logging
  • Standard library - things like count, sum, distinct, etc. Just a set of standard transforms that can be easily used.

The code is alive and well now, so you can check it out and start looking, I will appreciate any commentary you have, and would appreciate patches even more :-)