Oren Eini

CEO of RavenDB

a NoSQL Open Source Document Database

Get in touch with me:

oren@ravendb.net +972 52-548-6969

Posts: 7,520
|
Comments: 51,142
Privacy Policy · Terms
filter by tags archive
time to read 40 min | 7916 words

Yes, it is somewhat of a blast from the past, but I just got asked how to create a good Union All operation for Rhino ETL.

The obvious implementation is:

   1: public class UnionAllOperation : AbstractOperation
   2: {
   3:     private readonly List<IOperation> _operations = new List<IOperation>(); 
   4:  
   5:     public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
   6:     {
   7:         foreach (var operation in _operations)
   8:             foreach (var row in operation.Execute(null))
   9:                 yield return row;
  10:     }
  11:  
  12:     public UnionAllOperation Add(IOperation operation)
  13:     {
  14:         _operations.Add(operation);
  15:         return this;
  16:     }
  17: }

The problem is that this does everything synchronously. The following code is a better impl, but note that this is notepad code, with all the implications of that.

   1: public class UnionAllOperation : AbstractOperation
   2: {
   3:     private readonly List<IOperation> _operations = new List<IOperation>(); 
   4:  
   5:     public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
   6:     {
   7:         var blockingCollection = new BlockingCollection<Row>();
   8:         var tasks = _operations.Select(currentOp => Task.Factory.StartNew(() =>{
   9:                 foreach(var operation in currentOp.Execute(null))
  10:                 {
  11:                     blockingCollection.Add(operation);
  12:                 }
  13:                 blockingCollection.Add(null); // free the consumer thread
  14:             });
  15:  
  16:         Row r;
  17:         while(true){
  18:             if(tasks.All(x=>x.IsFaulted || x.IsCanceled || x.IsCompleted)) // all done
  19:                 break;
  20:             r = blockingCollection.Take();
  21:             if(r == null)
  22:                 continue;
  23:             yield return r;
  24:         }
  25:         while(blockingCollection.TryTake(out r)) {
  26:             if(r == null)
  27:                 continue;
  28:             yield return r;
  29:         }
  30:         Task.WaitAll(tasks.ToArray()); // raise any exception that were raised during execption
  31:     }
  32:  
  33:     public UnionAllOperation Add(IOperation operation)
  34:     {
  35:         _operations.Add(operation);
  36:         return this;
  37:     }
  38: }

Usual caveats apply, notepad code, never actually run it, much less tested / debugged it.

Feel free to rip into it, though.

Dale Newman did some improvements, the most important one is to make sure that we aren’t going to evaluate the tasks several times (opps! I told ya it was notepad code Smile), and now it looks like this:

   1: /// <summary>
   2: /// Combines rows from all operations.
   3: /// </summary>
   4: public class UnionAllOperation : AbstractOperation {
   5:  
   6:     private readonly List<IOperation> _operations = new List<IOperation>();
   7:  
   8:     /// <summary>
   9:     /// Executes the added operations in parallel.
  10:     /// </summary>
  11:     /// <param name="rows"></param>
  12:     /// <returns></returns>
  13:     public override IEnumerable<Row> Execute(IEnumerable<Row> rows) {
  14:  
  15:         var blockingCollection = new BlockingCollection<Row>();
  16:  
  17:         Debug("Creating tasks for {0} operations.", _operations.Count);
  18:  
  19:         var tasks = _operations.Select(currentOp => Task.Factory.StartNew(() => {
  20:             Trace("Executing {0} operation.", currentOp.Name);
  21:             foreach (var row in currentOp.Execute(null)) {
  22:                 blockingCollection.Add(row);
  23:             }
  24:             blockingCollection.Add(null); // free the consumer thread
  25:         })).ToArray();
  26:  
  27:         Row r;
  28:         while (true) {
  29:             if (tasks.All(x => x.IsFaulted || x.IsCanceled || x.IsCompleted)) {
  30:                 Debug("All tasks have been canceled, have faulted, or have completed.");
  31:                 break;
  32:             }
  33:  
  34:             r = blockingCollection.Take();
  35:             if (r == null)
  36:                 continue;
  37:  
  38:             yield return r;
  39:  
  40:         }
  41:  
  42:         while (blockingCollection.TryTake(out r)) {
  43:             if (r == null)
  44:                 continue;
  45:             yield return r;
  46:         }
  47:  
  48:         Task.WaitAll(tasks); // raise any exception that were raised during execption
  49:  
  50:     }
  51:  
  52:     /// <summary>
  53:     /// Initializes this instance
  54:     /// </summary>
  55:     /// <param name="pipelineExecuter">The current pipeline executer.</param>
  56:     public override void PrepareForExecution(IPipelineExecuter pipelineExecuter) {
  57:         foreach (var operation in _operations) {
  58:             operation.PrepareForExecution(pipelineExecuter);
  59:         }
  60:     }
  61:  
  62:     /// <summary>
  63:     /// Add operation parameters
  64:     /// </summary>
  65:     /// <param name="ops">operations delimited by commas</param>
  66:     /// <returns></returns>
  67:     public UnionAllOperation Add(params IOperation[] ops) {
  68:         foreach (var operation in ops) {
  69:             _operations.Add(operation);
  70:         }
  71:         return this;
  72:     }
  73:  
  74:     /// <summary>
  75:     /// Add operations
  76:     /// </summary>
  77:     /// <param name="ops">an enumerable of operations</param>
  78:     /// <returns></returns>
  79:     public UnionAllOperation Add(IEnumerable<IOperation> ops) {
  80:         _operations.AddRange(ops);
  81:         return this;
  82:     }
  83:  
  84: }
time to read 4 min | 800 words

I am currently working with a customer on some issues that they have with moving to RavenDB, and we run into a set of problems with their (Legacy with a capital L) relational database.

They run into several problems with how to create a good ETL on that, especially with regards to how to detect changes and the tendency of the legacy system to re-use old primary keys.

The solution for both is actually fairly easy. Instead of relying on the primary keys of the legacy system, which can be re-used and creates a ton of trouble down the stream, create your own ids, distinct from the legacy system ids.

That can be done easily enough by issuing:

ALTER TABLE Customers ADD UniqueKeyForEtl uniqueidentifier NOT NULL DEFAULT(newid())

This is a non breaking change operation, that is, you can do that on any database without fearing that this would somehow break any application that is using it. The good thing about this is that this now ensures that every row in the table is going to have a unique, never repeating, never re-used key. This is a good approach because it is also something that has such a low cost.

The next problem was how to actually detect changes, the Legacy System does have LastModified column on some tables, and actually bothers to update this in some cases, but not in all of them. Again, the answer is to add a column to the table. The easiest option would probably to just ensure that the LastModified is updated in a trigger, something like:

CREATE TRIGGER UpdateCustomersLastModifiedDate ON Customers
FOR UPDATE 
AS
UPDATE [TableName] SET Customers.LastModified=getdate()
FROM Customers INNER JOIN Inserted ON Customers.[UniqueID]= Inserted.[UniqueID]

Maybe with a check to skip the update if the Legacy System already updated it.

The problem is that the Legacy System has so many triggers already, that the client is very reluctant to add another one. So another option is to use the rowversion feature in SQL Server. This allows us to define the following:

ALTER TABLE Customers ADD ModifiedVersionForEtl rowversion NOT NULL 

The rowversion will be incremented by the DB on every write. So you can check on all rows that has been updated since the last version that you have seen. This isn’t a trigger, since this happens as part of the actual update process, and is likely to be significantly cheaper.

By adding these two columns, an operation that it is safe to make since it can’t break any code that uses the database, we have given ourselves an easy way to detect changes, and an easy way to get unique keys that are actually unique, and non repeating.

time to read 1 min | 124 words

There have been some changes, and it seems that it is hard to track them. Here are where you can find the master repositories for the rhino tools projects:

Rhino ETL Video

time to read 1 min | 107 words

Paul Barriere has a video up of a presentation about Rhino ETL:

ETL stands for Extract, Transform, Load. For example, you receive files or other data from vendors or other third parties which you need to manipulate in some way and then insert into your own database. Rhino ETL is an open source C# package that I have used for dozens of production processes quite successfully. By using C# for your ETL tasks you can create testable, reusable components more easily than with tools like SSIS and DTS.

It is good to see more information available on Rhino ETL.

On PSake

time to read 7 min | 1212 words

James Kovacks introduced psake ( a power shell based build system )over a year ago, and at the time, I gave it a glance and decided that it was interesting, but not worth further investigation.

This weekend, as I was restructuring my Rhino Tools project, I realized that I need to touch the build system as well. The Rhino Tools build system has been through several projects, and was originally ported from Hibernate. It is NAnt based, complex, and can do just about everything that you want expect be easily understandable.

It became clear to me very quickly that it ain’t going to be easy to change the way it works, nor would it be easy to modify that to reflect the new structure. There are other issues with complex build systems, they tend to create zones of “there be dragons”, where only the initiated go, and even they go with trepidation. I decided to take advantage of the changes that I am already making to get a simpler build system.

I had a couple of options open to me: Rake and Bake.

Bake seemed natural, until I remember that no one touched it in a year or two. Beside, I can only stretch NIH so far :-). And while I know that people rave about rake, I did not want to introduce a Ruby dependency on my build system. I know that it was an annoyance when I had to build Fluent NHibernate.

One thing that I knew that I am not willing to go back to was editing XML, so I started looking at other build systems, ending up running into PSake.

There are a few interesting things that reading about it brought to mind. First, NAnt doesn’t cut it anymore. It can’t build WPF applications nor handle multi targeting well. Second, I am already managing the compilation part of the build using MSBuild, thanks to Visual Studio.

That leave the build system with executing msbuild, setting up directories, executing tests, running post build tools, etc.

PSake handles those well, since the execution environment is the command line. The syntax is nice, just enough to specify tasks and dependencies, but everything else is just pure command line. The following is Rhino Mocks build script, using PSake:

properties { 
  $base_dir  = resolve-path .
  $lib_dir = "$base_dir\SharedLibs"
  $build_dir = "$base_dir\build" 
  $buildartifacts_dir = "$build_dir\" 
  $sln_file = "$base_dir\Rhino.Mocks-vs2008.sln" 
  $version = "3.6.0.0"
  $tools_dir = "$base_dir\Tools"
  $release_dir = "$base_dir\Release"
} 

task default -depends Release

task Clean { 
  remove-item -force -recurse $buildartifacts_dir -ErrorAction SilentlyContinue 
  remove-item -force -recurse $release_dir -ErrorAction SilentlyContinue 
} 

task Init -depends Clean { 
    . .\psake_ext.ps1
    Generate-Assembly-Info `
        -file "$base_dir\Rhino.Mocks\Properties\AssemblyInfo.cs" `
        -title "Rhino Mocks $version" `
        -description "Mocking Framework for .NET" `
        -company "Hibernating Rhinos" `
        -product "Rhino Mocks $version" `
        -version $version `
        -copyright "Hibernating Rhinos & Ayende Rahien 2004 - 2009"
        
    Generate-Assembly-Info `
        -file "$base_dir\Rhino.Mocks.Tests\Properties\AssemblyInfo.cs" `
        -title "Rhino Mocks Tests $version" `
        -description "Mocking Framework for .NET" `
        -company "Hibernating Rhinos" `
        -product "Rhino Mocks Tests $version" `
        -version $version `
        -clsCompliant "false" `
        -copyright "Hibernating Rhinos & Ayende Rahien 2004 - 2009"
        
    Generate-Assembly-Info `
        -file "$base_dir\Rhino.Mocks.Tests.Model\Properties\AssemblyInfo.cs" `
        -title "Rhino Mocks Tests Model $version" `
        -description "Mocking Framework for .NET" `
        -company "Hibernating Rhinos" `
        -product "Rhino Mocks Tests Model $version" `
        -version $version `
        -clsCompliant "false" `
        -copyright "Hibernating Rhinos & Ayende Rahien 2004 - 2009"
        
    new-item $release_dir -itemType directory 
    new-item $buildartifacts_dir -itemType directory 
    cp $tools_dir\MbUnit\*.* $build_dir
} 

task Compile -depends Init { 
  exec msbuild "/p:OutDir=""$buildartifacts_dir "" $sln_file"
} 

task Test -depends Compile {
  $old = pwd
  cd $build_dir
  exec ".\MbUnit.Cons.exe" "$build_dir\Rhino.Mocks.Tests.dll"
  cd $old        
}

task Merge {
    $old = pwd
    cd $build_dir
    
    Remove-Item Rhino.Mocks.Partial.dll -ErrorAction SilentlyContinue 
    Rename-Item $build_dir\Rhino.Mocks.dll Rhino.Mocks.Partial.dll
    
    & $tools_dir\ILMerge.exe Rhino.Mocks.Partial.dll `
        Castle.DynamicProxy2.dll `
        Castle.Core.dll `
        /out:Rhino.Mocks.dll `
        /t:library `
        "/keyfile:$base_dir\ayende-open-source.snk" `
        "/internalize:$base_dir\ilmerge.exclude"
    if ($lastExitCode -ne 0) {
        throw "Error: Failed to merge assemblies!"
    }
    cd $old
}

task Release -depends Test, Merge {
    & $tools_dir\zip.exe -9 -A -j `
        $release_dir\Rhino.Mocks.zip `
        $build_dir\Rhino.Mocks.dll `
        $build_dir\Rhino.Mocks.xml `
        license.txt `
        acknowledgements.txt
    if ($lastExitCode -ne 0) {
        throw "Error: Failed to execute ZIP command"
    }
}

It is about 50 lines, all told, with a lot of spaces and is quite readable.

This handles the same tasks as the old set of scripts did, and it does this without undue complexity. I like it.

time to read 2 min | 398 words

This post is about the Rhino Tools project. It has been running for a long time now, over 5 years, and amassed quite a few projects in it.

I really like the codebase in the projects in Rhino Tools, but secondary aspects has been creeping in that made managing the project harder. In particular, putting all the projects in a single repository made it easy, far too easy. Projects had an easy time taking dependencies that they shouldn’t, and the entire build process was… complex, to say the least.

I have been somewhat unhappily tolerant of this so far because while it was annoying, it didn’t actively create problems for me so far. The problems started creeping when I wanted to move Rhino Tools to use NHibernate 2.1. That is when I realized that this is going to be a very painful process, since I have to take on the entire Rhino Tools set of projects in one go, instead of dealing with each of them independently. the fact that so many of the dependencies where in Rhino Commons, to which I have a profound dislike, helped increase my frustration.

There are other things that I find annoying now, Rhino Security is a general purpose library for NHibernate, but it makes a lot of assumptions about how it is going to use, which is wrong. Rhino ETL had a dependency on Rhino Commons because of three classes.

To resolve that, I decided to make a few other changes, taking dependencies is supposed to be a hard process, it is supposed to make you think.

I have been working on splitting the Rhino Tools projects to all its sub projects, so each of them is independent of all the others. That increase the effort of managing all of them as a unit, but decrease the effort of managing them independently.

The current goals are to:

  • Make it simpler to treat each project independently
  • Make it easier to deal with the management of each project (dependencies, build scripts)

There is a side line in which I am also learning to use Git, and there is a high likelihood that the separate Rhino Tools projects will move to github. Suversion’s patching & tracking capabilities annoyed me for the very last time about a week ago.

Rhino ETL 2.0

time to read 11 min | 2021 words

Rhino ETL was born out of a need. I need to do a lot of ETL type operations. Those include anything from moving data from legacy databases to my database, importing files, importing data over web services, etc. For a while, I have used SSIS for those needs. It has proven... inadequate. Mostly in terms of ease of development, deployment, error handling, etc.

This is my third attempt at building an ETL tool. The third time is much shorter and clearer than both previous attempts.

The goals for the project were:

  • Developer friendly:
    • Errors
    • Development
    • Deployment
  • Performant - I need to move large amounts of data around, and I need to do it fast.
  • Easy - The hard part should be handling the required transforms, dealing with the logic for in/out, etc.
  • Unlimited - You should not be limited to what the tool gives you, and you should be able to integrate easily with other tools
  • Language agnostic - You should be able to develop solutions for this using C#/VB/Boo
  • DSL - Provide a DSL to make it even easier to work with

The overall concept is based around these two classes, and the idea of a pipeline:

image image

Here is a simple operation, which just generate all the even numbers to a million:

public class EvenNumberToMillion : AbstractOperation
{
	public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
	{
		for(int i = 2; i< 1000000; i += 2)
		{
			Row row = new Row();
			row["number"] = i;
			yield return row;
		}
	}
}

This is an input operation, it ignores its rows parameter, and yields rows generated by some other way. As you can see, we use the yield rows for each iteration.

We combine operations into a pipeline using the process. If we wanted to print the numbers, we would have build the following pipeline process:

public class PrintNumbersProcess : EtlProcess
{
	public override void Initialize()
	{
		Register(new EvenNumberToMillion());
		Register(new PrintNumbers());
	}
}

All the output of the first operation goes into the second operation, and so on and so forth. Using the DSL it will look like:

operation EvenNumbersToMillion:
	for i in range(1000000,2):
		yield Row(Number: i)

operation PrintNumbers:
	for row in rows:
		print row.Number

process PrintNumbersProcess:
	EvenNumbersToMillion()
	PrintNumbers()

This is just to demonstrate the concept of the pipeline. Now we can get into the interesting operations. As you already surmised, AbstractOperation is the common base class, and you can inherit it to produce rows from any source.

Inputs

Rhino ETL offers special support for getting data from a database. It means that you can define it simple as:

public class ReadUsers : ConventionInputCommandOperation
{
    public ReadUsers() : base("test")
    {
        Command = "SELECT Id, Name,Email FROM Users";
    }
}

Or, using DSL:

input "test", Command = "SELECT id, name, email  FROM Users"

Note the "test" here, it is the name of the connection string in the app.config.

Outputs

On the output side, we have more interesting options. We can use any custom option that we want, of course, but for working with databases, we have the following options:

Standard DB commands:

public class FibonacciOutput : ConventionOutputCommandOperation
{
    public FibonacciOutput() : base("test")
    {
        Command = "INSERT INTO Fibonacci (Id) VALUES(@Id)";
    }
}

You'll note that I am not specifying the parameters, those are taken implicitly from the current row.

Using DSL:

output "test", Command = "INSERT INTO Fibonacci (Id) VALUES(@Id)"

SQL Batch operations:

public class FibonacciOutput : ConventionSqlBatchOpeartion
{
    public FibonacciOutput() : base("test")
    {
        Command = "INSERT INTO Fibonacci (Id) VALUES(@Id)";
    }
}

Using DSL: Haven't written that yet :-(

Sql Bulk Insert:

 public class FibonacciBulkInsert : SqlBulkInsertOperation
    {
        public FibonacciBulkInsert() : base("test", "Fibonacci")
        {
        }

        protected override void PrepareSchema()
        {
            Schema["id"] = typeof (int);
        }
    }

Using DSL: Haven't written that yet :-(

Files

For working with files, we have the support of the excellent FileHelpers library, which makes working with files really easy.

Reading from a file is simply:

public class UserRecord
{
    public string email;
    public int id;
    public string name;
}

public class ReadUsersFromFile : AbstractOperation
{
    public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
    {
        using(FileEngine file = FluentFile.For<UserRecord>().From("users.txt"))
        {
            foreach (object obj in file)
            {
                yield return Row.FromObject(obj);
            }
        }
    }
}

There is 1:1 translation to Boo here, so I'll spare you that.

Writing is very similar:

public class WriteUsersToFile : AbstractOperation
{
    public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
    {
        FluentFile engine = FluentFile.For<UserRecord>();
        engine.HeaderText = "Id\tName\tEmail";
        using(FileEngine file = engine.To("users.txt"))
        {
            foreach (Row row in rows)
            {
                UserRecord record = new UserRecord();
                
                record.Id = (int)row["id"];
                record.Name = (string)row["name"];
                record.Email = (string)row["email"];

                file.Write(record);
            }
        }
        yield break;
    }
}

Joins

Joins are an interesting concept, and I play with them quite extensively recently. Joins in Rhino ETL are implemented as sub processes. Hash joins are very simple:

public class JoinUsersAndIds : JoinOperation
{
	protected override Row MergeRows(Row leftRow, Row rightRow)
	{
		Row row = leftRow.Clone();
		row["user_id"] = rightRow["new_id"];
		return row;
	}

	protected override void SetupJoinConditions()
	{
		InnerJoin
			.Left("id")
			.Right("old_id");
	}
}

This is just the operation, you hook it up in the process using:

Register(new JoinUsersAndIds()
         	.Left(new GenerateUsers(25000))
         	.Right(new GenerateRandomIds(15000)));

Each side is capable of accepting a full blown sub process on its own.

Nested loops joins are appropriate for the more complex cases:

public class FullJoinUsersToPeopleByEmail : NestedLoopsJoinOperation
{
    protected override bool MatchJoinCondition(Row leftRow, Row rightRow)
    {
        return LeftJoin(leftRow["email"], rightRow["email"]);
    }

	protected override Row MergeRows(Row leftRow, Row rightRow)
    {
        Row row = new Row();
        row.Copy(leftRow);
        row["person_id"] = rightRow["id"];
        return row;
    }
}

Using DSL it looks like this:

join get_user_roles:
	left:
		input "test", Command = "SELECT id, name, email  FROM Users"
		
	right:
		nput "test", Command = "SELECT userid, roleid FROM UsersToRoles"
			
	on left.id ==  right.userid:
		row.Name = left.Name
		row.Role = right.RoleId

(A note about this syntax, this currently generate a nested loop join, I intend to make it generate an optimize version soon).

Branching

That was quite a challenge to implement, I kept missing a key point, and that tripped me for a while. Here is how send a row to several sources:

BranchingOperation split = new BranchingOperation()
	.Add(Partial
		.Register(new MultiplyByThreeOperation())
		.Register(new Fibonacci.Bulk.FibonacciBulkInsert()))
	.Add(Partial
		.Register(new Fibonacci.Bulk.FibonacciBulkInsert()));
Register(split);

(Note, the implementation is not as optimized as it could be.)

Aggregates

Well, you got to have those, don't you? Here is a simple Row Count aggregate:

public class RowCount : AbstractAggregationOperation
{
    	protected override void Accumulate(Row row, Row aggregate)
      {
            if (aggregate["count"] == null)
                aggregate["count"] = 0;

            int count = (int)aggregate["count"];
            aggregate["count"] = count + 1;
      }
}

We are called once per row, and can accumulate all the values that we want. We can use grouping to create more interesting results:

public class CostPerProductAggregation : AbstractAggregationOperation
{
    protected override void Accumulate(Row row, Row aggregate)
    {
        aggregate["name"] = row["name"];
        if(aggregate["cost"]==null)
            aggregate["cost"] = 0;
        aggregate["cost"] = ((int) aggregate["cost"]) + ((int) row["price"]);
    }

    protected override string[] GetColumnsToGroupBy()
    {
        return new string[] {"name"};
    }
}

We can also override the FinishAggregation(Row) method to complete any calculations when all the rows are completed. Rhino ETL guarantees that we will get the same aggregate row for all the rows that match the same columns, so that is taken care of.

Using DSL for that is simply:

aggregate join_product_names:
	accumulate:
		aggregate.names = [] if aggregate.names is null
		aggregate.names.Add(row.name)
	
	terminate:
		aggregate.result = string.Join(", ", aggregate.names.ToArray(string))

That is about it for now, I think. You can get the source for Rhino ETL here:

https://rhino-tools.svn.sourceforge.net/svnroot/rhino-tools/trunk/rhino-etl

I plan to have a binary release one I am confident that all the notes in this post are no longer relevant.

time to read 2 min | 294 words

I thought about moving from hashtables to Dictionary<T,K>, I got interesting results.

For simple new Dictionary<string,object>(), I expected a significant improvement, but I got this:

image

This is actually much worse than the result of hashtable + ignore case comparison.

When I used that, I got this horrendous result:

image

I tried various other tricks, but none of them change the fact that making 7.5 million calls are going to cost a lot of time. And I want to support more than just 2,500 x 1,500.

I changed the implementation to look like this:

rightRowsByJoinKey = {}
for rightRow in right:
	key = rightRow.CreateKey( rightJoinParams )
	rightRowsByJoinKey[ key ] = [] unless rightRowsByJoinKey[ key ]
	rightRowsByJoinKey[ key ].Add(rightRow)

for leftRow in left:
	key = leftRow.CreateKey( leftJoinParams )
	for matchingRight in rightRowsByJoinKey[ key ] :
		yield MergeRows( leftRow, rightRow )

Now I have N + M, instead on N*M.

From performance perspective, it means that doing nested loop join on 2,500 x 1,500 result in 3.5 millions comparisons, which is quite a bit, even for such a small set of rows. It took over 6 seconds to run on my machine.

A hash join, however,will perform  measly 5,000 operations to do the same amount of work. On my machine, 2,500 x 1,500 completes in 0.2 seconds, most of which are spend it just initialization of the framework.

I try to take that to a spin on with two orders of magnitude more rows, 250,000 x 150,000 has completed in 5.48 seconds. Which is very encouraging.

Hash join is not applicable if you want to join over anything but equality, which is why we need the nested loops join as well.

time to read 2 min | 308 words

I did some heavy duty import process yesterday, and we run into severe performance issue with Rhino ETL joins. Five joins with about 250,000 records on the initial left and a few tens of thousands on the rights took about 2 hours to complete.

That was unacceptable, and I decided that I have to fix this issue. I had a fairly good idea about what the issue was. Rhino ETL supports nested loops joins only at the moment, which means that the join is performed as (pseudo code):

for leftRow in left:
	for rightRow in right:
		if MatchJoinCondition(leftRow, rightRow):
			yield MergeRows(leftRow, rightRow)

Obviously the N*M was what causing the problem right? I quickly built a trivial join test, which joined 2,500 rows on the left with 1,500 rows on the right. Trivial stuff, and should result in 1,500 rows returned.

It executed in 6 seconds. That was a shock.

Well 1,500 * 2,500  = 3,750,000, but it should be that bad.

Then I run the code under a profiler, and it completed in 29 seconds, but I also saw this:

image

It is not the nested loop that cost me all this money, it was the hash table lookups!

The most expensive call was GetHashCodeOfString, we have some globalization stuff here, because I told the hashtable to be case insensitive, I tried removing that and run it under the profiler again, now it dropped to 18 seconds, and we had this cost structure to deal with:

image

We still spend almost all of our time just doing hash table lookups, although we dropped by 10 seconds this time.

I don't think that I would have ever consider the cost of simply doing the hashtable lookups as the primary cost of the join operations.

Fluent Pipelines

time to read 2 min | 348 words

I am having a discussion with Jon Skeet about the merits of using Linq for pipelines and delegates/lambda instead of classes.

I kept saying that I don't really see the point, so I went ahead and implemented this:

GenerateData(10000)//enumerator from 0 .. 10000
	.Where(i => i % 3 == 0)
	.Transform(i => (i * 2).ToString() )
	.Act(i => Console.WriteLine(i))
	.Execute();

This uses the same approach as my previous pipeline, but it does it in C# 3.0, so it can use things like extension methods, which make this nicer. The same in C# 2.0 is possible, but take some ridiculous amount of code to do.

This code is much simpler than the code I have shown here, no?

Why do I use the first approach then?

Scalability.

What we are seeing here is about as trivial as it can get. What happens when we have more complex semantics?

Let us take writing to the database as an example. We need to do quite a bit there, more than we would put in the lambda expression, certainly. We can extract to a method, but then we run into another problem, we can't do method inheritance. This means that I have no easy way of abstracting the common stuff outside. Well, I can use template method, but that works if and only if I have a single place I want to change behavior.

As an example of scaling, let us take this piece of code:

public class FibonacciBulkInsert : SqlBulkInsertOperation
{
	public FibonacciBulkInsert() : base("test", "Fibonacci")
	{
	}

	protected override void PrepareSchema()
	{
		Schema["id"] = typeof (int);
	}
}

Which uses this base class to handle the bulk of the work.

One thing that Jon mentioned that was interesting was the ability to take advantage of Linq specific improvements, such as PLinq. This is indeed a consideration, but upon some reflection on it, I realized that the two are not mutually exclusive. If I want to take advantage of any of that, all I need is to modify the pipeline to iterate using PLinq rather than foreach.

FUTURE POSTS

No future posts left, oh my!

RECENT SERIES

  1. Challenge (75):
    01 Jul 2024 - Efficient snapshotable state
  2. Recording (14):
    19 Jun 2024 - Building a Database Engine in C# & .NET
  3. re (33):
    28 May 2024 - Secure Drop protocol
  4. Meta Blog (2):
    23 Jan 2024 - I'm a JS Developer now
  5. Production Postmortem (51):
    12 Dec 2023 - The Spawn of Denial of Service
View all series

Syndication

Main feed Feed Stats
Comments feed   Comments Feed Stats
}